You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/05/09 12:34:36 UTC

flink git commit: [FLINK-1792] Add processCPULoad in metricsRegistry, add button to show/hide graphs, add summary for metrics

Repository: flink
Updated Branches:
  refs/heads/master 84dcb7c20 -> bc8f1978c


[FLINK-1792] Add processCPULoad in metricsRegistry, add button to
show/hide graphs, add summary for metrics

This closes #553

(Also close the mesos pull request)
This closes #251


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc8f1978
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc8f1978
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc8f1978

Branch: refs/heads/master
Commit: bc8f1978c9f553064047b275645944a10fc6531a
Parents: 84dcb7c
Author: bhatsachin <bh...@gmail.com>
Authored: Tue Mar 31 13:48:08 2015 +0530
Committer: Robert Metzger <rm...@apache.org>
Committed: Sat May 9 12:08:26 2015 +0200

----------------------------------------------------------------------
 .../web-docs-infoserver/js/taskmanager.js       | 139 ++++++++++++++++---
 .../web-docs-infoserver/taskmanagers.html       |   3 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  36 +++++
 3 files changed, 161 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc8f1978/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
index f038a3b..1ea9a41 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
@@ -60,6 +60,9 @@ var memoryValues = ["memory.non-heap.used" , "memory.flink.used", "memory.heap.u
 
 var metricsLimit = 3;
 
+// number of minutes for which the summary will be provided
+var summaryTime = 10;
+
 /**
 Create rickshaw graph for the specified taskManager id (tmid).
 **/
@@ -69,6 +72,7 @@ function createGraph(tmId, maxload, maxmem) {
     var scales = [];
     scales.push(d3.scale.linear().domain([0, maxmem]));
     scales.push(d3.scale.linear().domain([0, maxload]).nice());
+    scales.push(d3.scale.linear().domain([0,100]));
     for(i in memoryValues) {
         var value = memoryValues[i];
         taskManagerMemory[tmId][value] = [];
@@ -91,6 +95,16 @@ function createGraph(tmId, maxload, maxmem) {
         renderer: 'line',
         stroke: 'rgba(0,0,0,0.5)'
     });
+    taskManagerMemory[tmId]["cpuLoad"] = [];
+    // add cpu load series
+    series.push({
+        color: palette.color(),
+        scale: scales[2],
+        data: taskManagerMemory[tmId]["cpuLoad"],
+        name: "CPU Load",
+        renderer: 'line',
+        stroke: 'rgba(0,0,0,0.5)'
+    });
 
     // remove message
     $("#chart-"+tmId).html("");
@@ -121,7 +135,7 @@ function createGraph(tmId, maxload, maxmem) {
     var y_axis_load = new Rickshaw.Graph.Axis.Y.Scaled( {
         graph: graph,
         orientation: 'right',
-        scale: scales[1],
+        scale: scales[2],
         grid: false,
         element: document.getElementById("y_axis-load-"+tmId)
     } );
@@ -199,11 +213,14 @@ function processTMdata(json) {
 		// check if taskManager has a row
 		tmRow = $("#"+tmRowIdCssName);
 		if(tmRow.length == 0) {
-		    var tmMemoryBox = "<div class=\"chart_container\" id=\"chart_container-"+tm.instanceID+"\">"+
+		    // *-memory_stats div contains only the statistics where as chart_container-* div contains the graph
+		    var tmMemoryBox =  "<button type=\"button\" class=\"btn btn-default\" id=\"graph_button-"+tm.instanceID+"\" onclick=\"hideShowGraph('"+tm.instanceID+"')\"></button>"+"<br>"+"<br>"+
+		                       "<div id=\""+tmRowIdCssName+"-memory_stats"+"\">"+"</div>"+
+		                       "<div class=\"chart_container\" id=\"chart_container-"+tm.instanceID+"\">"+
                                   "<div class=\"y_axis\" id=\"y_axis-"+tm.instanceID+"\"><p class=\"axis_label\">Memory</p></div>"+
                                   "<div class=\"chart\" id=\"chart-"+tm.instanceID+"\"><i>Waiting for first Heartbeat to arrive</i></div>"+
-                                  "<div class=\"y_axis-load\" id=\"y_axis-load-"+tm.instanceID+"\"><p class=\"axis_label\">Load</p></div>"+
-                               "<div class=\"legend\" id=\"legend-"+tm.instanceID+"\"></div>"+
+                                  "<div class=\"y_axis-load\" id=\"y_axis-load-"+tm.instanceID+"\"><p class=\"axis_label\">CPU Load</p></div>"+
+                                  "<div class=\"legend\" id=\"legend-"+tm.instanceID+"\"></div>"+
                                "</div>";
 
             var content = "<tr id=\""+tmRowIdCssName+"\">" +
@@ -224,6 +241,31 @@ function processTMdata(json) {
 		    taskManagerGraph[tm.instanceID].render();
         //    taskManagerGraph[tm.instanceID].resize();
 		}
+
+        // html dump for memory statistics of task manager
+        var tmMemStats = $("#"+tmRowIdCssName+"-memory_stats");
+        tmMemStats.html("<table><tr><td><b>CPU Load</b></td><td></td></tr>"+
+                        "<tr><td>Current: <span id=\""+tmRowIdCssName+"-cpuLoad\"></span>%</td>"+"<td>Avg: <span id=\""+tmRowIdCssName+"-avg_cpuLoad\"></span>%</td></tr>"+
+                        "<tr><td><b>OS Load</b></td><td></td></tr>"+
+                        "<tr><td>Current: <span id=\""+tmRowIdCssName+"-osLoad\"></span></td>"+"<td>Avg: <span id=\""+tmRowIdCssName+"-avg_load\"></span></td></tr>"+
+                        "<tr><td><b>Memory.heap.used</b></td><td></td></tr>"+
+                        "<tr><td>Current: <span id=\""+tmRowIdCssName+"-memHeapUsed\"></span></td>"+"<td>Avg: <span id=\""+tmRowIdCssName+"-avg_memory_heap_used\"></span></td></tr>"+
+                        "<tr><td><b>Memory.flink.used</b></td><td></td></tr>"+
+                        "<tr><td>Current: <span id=\""+tmRowIdCssName+"-memFlinkUsed\"></span></td>"+"<td>Avg: <span id=\""+tmRowIdCssName+"-avg_memory_flink_used\"></span></td></tr>"+
+                        "<tr><td><b>Memory.non-heap.used</b></td><td></td></tr>"+
+                        "<tr><td>Current: <span id=\""+tmRowIdCssName+"-memNonHeapUsed\"></span></td>"+"<td>Avg: <span id=\""+tmRowIdCssName+"-avg_memory_non-heap_used\"></span></td></tr></table>");
+
+        // preserve the show/hide state of graph after update interval
+        var graphElement =  $("#chart_container-"+tm.instanceID);
+        if(graphElement.is(':visible')){
+            $("#graph_button-"+tm.instanceID).text("Hide Detailed Graph");
+            $("#tm-row-"+tm.instanceID+"-memory_stats").hide();
+        } else {
+            $("#graph_button-"+tm.instanceID).text("Show Detailed Graph");
+            $("#tm-row-"+tm.instanceID+"-memory_stats").show();
+        }
+
+
         // fill (update) row with contents
         // memory statistics
         var time = getUnixTime();
@@ -234,26 +276,51 @@ function processTMdata(json) {
             switch(valueKey) {
                 case "memory.heap.used":
                     var value = metricsJSON.gauges[valueKey].value - flinkMemory;
+                    $("#"+tmRowIdCssName+"-memHeapUsed").html(formatBase1024KMGTP(value));
                     break;
                 case "memory.non-heap.used":
                     var value = metricsJSON.gauges[valueKey].value;
+                    $("#"+tmRowIdCssName+"-memNonHeapUsed").html(formatBase1024KMGTP(value));
                     break;
                 case "memory.flink.used":
                     var value = flinkMemory;
+                    $("#"+tmRowIdCssName+"-memFlinkUsed").html(formatBase1024KMGTP(value));
                     break;
             }
             taskManagerMemory[tm.instanceID][valueKey].push({x: time, y: value})
         }
-        // load
-        taskManagerMemory[tm.instanceID]["load"].push({x:time, y:metricsJSON.gauges["load"].value });
+        // os load
+        var osLoadValue = Number(metricsJSON.gauges["load"].value.toFixed(2));
+        taskManagerMemory[tm.instanceID]["load"].push({x:time, y:osLoadValue });
+        $("#"+tmRowIdCssName+"-osLoad").html(osLoadValue);
+
+        // cpu load
+        var cpuLoadValue = Number((metricsJSON.gauges["cpuLoad"].value*100).toFixed(2));
+        taskManagerMemory[tm.instanceID]["cpuLoad"].push({x:time, y:cpuLoadValue });
+        if(cpuLoadValue != -100){
+            $("#"+tmRowIdCssName+"-cpuLoad").html(cpuLoadValue);
+        } else {
+            $("#"+tmRowIdCssName+"-cpuLoad").html("NA"+getTooltipHTML("CPU Load is unavailable as the java version is not 1.7 or above"));
+        }
+
+
+        // generate summary for the last summaryTime minutes
+        var summaryStats = generateSummaryFor(taskManagerMemory[tm.instanceID],summaryTime);
+        // fill the averages
+        for(var statKey in summaryStats){
+            $("#"+tmRowIdCssName+"-avg_"+statKey.replace(/\./g,'_')).html(summaryStats[statKey]);
+        }
 
         if(metricsLimit == -1 || i < metricsLimit) {
             taskManagerGraph[tm.instanceID].update();
         } else {
-            $("#chart_container-"+tm.instanceID).hide();
+            tmMemStats.hide();
         }
 
-
+        // tooltip to show the time used for summary
+        var avgTimeInfo = "";
+        avgTimeInfo = getTooltipHTML("The average values are for the previous "+summaryTime+" minutes");
+        $("#tmTableHeaderMemStat").html("Memory Statistics "+avgTimeInfo);
 
         // info box
         tmInfoBox = $("#"+tmRowIdCssName+"-info");
@@ -314,34 +381,74 @@ function updateLimit(element) {
             $("#metrics-limit-all,#metrics-limit-none").removeClass("active");
             $(element).addClass("active");
             metricsLimit = 3;
-            hideShowGraphs();
+            hideShowMemStats();
             break;
         case 'metrics-limit-all':
             $("#metrics-limit-3,#metrics-limit-none").removeClass("active");
             $(element).addClass("active");
             metricsLimit = -1;
-            hideShowGraphs();
+            hideShowMemStats();
             break;
         case 'metrics-limit-none':
             $("#metrics-limit-all,#metrics-limit-3").removeClass("active");
             $(element).addClass("active");
             metricsLimit = 0;
-            hideShowGraphs();
+            hideShowMemStats();
             break;
     }
 }
+// toggle function for showing/hiding graphs
+function hideShowGraph(tmid){
+    var element = $("#chart_container-"+tmid);
+    if(element.is(":visible")){
+        $("#graph_button-"+tmid).text("Show Detailed Graph");
+        element.hide();
+        $("#tm-row-"+tmid+"-memory_stats").show();
+    } else {
+        $("#graph_button-"+tmid).text("Hide Detailed Graph");
+        element.show();
+        $("#tm-row-"+tmid+"-memory_stats").hide();
+    }
+}
 
-function hideShowGraphs() {
+// hide/show memory statistics for task managers according to metric limits
+function hideShowMemStats() {
     var i = 0;
     for(tmid in taskManagerMemory) {
-       if(metricsLimit == -1 || i++ < metricsLimit) {
-            $("#chart_container-"+tmid).show();
-       } else {
+        // by default hide the graphs when Show/Disable Metrics is clicked
+        if($("#chart_container-"+tmid).is(":visible")){
+            $("#graph_button-"+tmid).text("Show Detailed Graph");
             $("#chart_container-"+tmid).hide();
-       }
+        }
+        if(metricsLimit == -1 || i++ < metricsLimit) {
+            $("#tm-row-"+tmid+"-memory_stats").show();
+            $("#graph_button-"+tmid).show();
+        } else {
+            $("#tm-row-"+tmid+"-memory_stats").hide();
+            $("#graph_button-"+tmid).hide();
+        }
     }
 }
 
+// generate summary for the last *time* minutes
+function generateSummaryFor(stats,time){
+    var summary = {};
+    var numElements = time*12;
+    for(var key in stats){
+        if(key=="cpuLoad" && stats[key][0] && stats[key][0]['y']==-100){
+            summary[key]="NA";
+            continue;
+        }
+        var prevValues = stats[key].slice(numElements*-1);
+        var sum = (prevValues.reduce(function(p,q){return {x:p.x+q.x,y:p.y+q.y}})).y;
+        var avg = Number((sum/(prevValues.length)).toFixed(2));
+        if (avg > 1024){
+            avg = formatBase1024KMGTP(avg);
+        }
+        summary[key]=avg;
+    }
+    return summary;
+}
 
 function updateTaskManagers() {
 	$.ajax({ url : "setupInfo?get=taskmanagers", type : "GET", cache: false, success : function(json) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8f1978/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html b/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
index 5558478..c7cb52b 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
@@ -50,6 +50,7 @@ under the License.
     .chart_container {
         position: relative;
         font-family: Arial, Helvetica, sans-serif;
+        display: none;
     }
     .chart {
         position: relative;
@@ -157,7 +158,7 @@ under the License.
 	          <div class="table-responsive" id="taskmanagerTable">
                   <table class="table table-bordered table-hover table-striped">
                       <tr id="taskmanagerTable-header"><th>TaskManager</th>
-                          <th>Memory Statistics</th>
+                          <th id="tmTableHeaderMemStat">Memory Statistics</th>
                           <th>Information</th>
                       </tr>
                   </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8f1978/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a6b9133..747cc85 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,6 +22,7 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.concurrent.{TimeUnit, FutureTask}
+import java.lang.reflect.Method
 import java.lang.management.{GarbageCollectorMXBean, ManagementFactory, MemoryMXBean}
 
 import akka.actor._
@@ -1894,6 +1895,41 @@ object TaskManager {
       override def getValue: Double =
         ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
     })
+
+    // Preprocessing steps for registering cpuLoad
+    val fetchCPULoad = getMethodToFetchCPULoad()
+
+    // Log getProcessCpuLoad unavailable for Java 6
+    if(fetchCPULoad.isEmpty){
+      LOG.warn("getProcessCpuLoad method not available in the Operating System Bean" +
+        "implementation for this Java runtime environment\n" +
+        Thread.currentThread().getStackTrace)
+    }
+
+    metricRegistry.register("cpuLoad", new Gauge[Double] {
+      override def getValue: Double = {
+        try{
+          fetchCPULoad.map(_.invoke(ManagementFactory.getOperatingSystemMXBean().
+            asInstanceOf[com.sun.management.OperatingSystemMXBean]).
+            asInstanceOf[Double]).getOrElse(-1)
+        } catch {
+          case t: Throwable => {
+            LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
+            -1
+          }
+        }
+      }
+    })
     metricRegistry
   }
+
+  /**
+   * Fetches getProcessCpuLoad method if available in the
+   *  OperatingSystemMXBean implementation else returns None
+   * @return
+   */
+  private def getMethodToFetchCPULoad(): Option[Method] = {
+    val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods()
+    methodsList.filter(_.getName == "getProcessCpuLoad").headOption
+  }
 }