You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:49:00 UTC

[57/89] [abbrv] flink git commit: [FLINK-4452] [metrics] TaskManager network buffer gauges

[FLINK-4452] [metrics] TaskManager network buffer gauges

Adds gauges for the number of total and available TaskManager network
memory segments.

This closes #2408


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28743cfb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28743cfb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28743cfb

Branch: refs/heads/flip-6
Commit: 28743cfb86545cf9eaf4ec2cf37ec460a13f3537
Parents: 58165d6
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Aug 23 10:46:48 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Aug 24 09:02:15 2016 -0400

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  9 +++++++
 .../flink/runtime/taskmanager/TaskManager.scala | 25 +++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 023bef9..3a148e1 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -335,6 +335,15 @@ Flink exposes the following system metrics:
       <td></td>
     </tr>
     <tr>
+      <th rowspan="2"><strong>TaskManager.Status</strong></th>
+      <td>Network.AvailableMemorySegments</td>
+      <td>The number of unused memory segments.</td>
+    </tr>
+    <tr>
+      <td>Network.TotalMemorySegments</td>
+      <td>The number of allocated memory segments.</td>
+    </tr>
+    <tr>
       <th rowspan="19"><strong>TaskManager.Status.JVM</strong></th>
       <td>ClassLoader.ClassesLoaded</td>
       <td>The total number of classes loaded since the start of the JVM.</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5a95143..72ec2ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -965,7 +965,7 @@ class TaskManager(
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
     
-    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup)
+    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
     
     // watch job manager to detect when it dies
     context.watch(jobManager)
@@ -2357,9 +2357,16 @@ object TaskManager {
     metricRegistry
   }
 
-  private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = {
-    val jvm = taskManagerMetricGroup
+  private def instantiateStatusMetrics(
+      taskManagerMetricGroup: MetricGroup,
+      network: NetworkEnvironment)
+    : Unit = {
+    val status = taskManagerMetricGroup
       .addGroup("Status")
+
+    instantiateNetworkMetrics(status.addGroup("Network"), network)
+
+    val jvm = status
       .addGroup("JVM")
 
     instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
@@ -2369,6 +2376,18 @@ object TaskManager {
     instantiateCPUMetrics(jvm.addGroup("CPU"))
   }
 
+  private def instantiateNetworkMetrics(
+        metrics: MetricGroup,
+        network: NetworkEnvironment)
+    : Unit = {
+    metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
+      override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
+    })
+    metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
+      override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
+    })
+  }
+
   private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getClassLoadingMXBean