You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:49:00 UTC
[57/89] [abbrv] flink git commit: [FLINK-4452] [metrics] TaskManager
network buffer gauges
[FLINK-4452] [metrics] TaskManager network buffer gauges
Adds gauges for the number of total and available TaskManager network
memory segments.
This closes #2408
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28743cfb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28743cfb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28743cfb
Branch: refs/heads/flip-6
Commit: 28743cfb86545cf9eaf4ec2cf37ec460a13f3537
Parents: 58165d6
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Aug 23 10:46:48 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Aug 24 09:02:15 2016 -0400
----------------------------------------------------------------------
docs/monitoring/metrics.md | 9 +++++++
.../flink/runtime/taskmanager/TaskManager.scala | 25 +++++++++++++++++---
2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 023bef9..3a148e1 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -335,6 +335,15 @@ Flink exposes the following system metrics:
<td></td>
</tr>
<tr>
+ <th rowspan="2"><strong>TaskManager.Status</strong></th>
+ <td>Network.AvailableMemorySegments</td>
+ <td>The number of unused memory segments.</td>
+ </tr>
+ <tr>
+ <td>Network.TotalMemorySegments</td>
+ <td>The number of allocated memory segments.</td>
+ </tr>
+ <tr>
<th rowspan="19"><strong>TaskManager.Status.JVM</strong></th>
<td>ClassLoader.ClassesLoaded</td>
<td>The total number of classes loaded since the start of the JVM.</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5a95143..72ec2ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -965,7 +965,7 @@ class TaskManager(
taskManagerMetricGroup =
new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
- TaskManager.instantiateStatusMetrics(taskManagerMetricGroup)
+ TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
// watch job manager to detect when it dies
context.watch(jobManager)
@@ -2357,9 +2357,16 @@ object TaskManager {
metricRegistry
}
- private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = {
- val jvm = taskManagerMetricGroup
+ private def instantiateStatusMetrics(
+ taskManagerMetricGroup: MetricGroup,
+ network: NetworkEnvironment)
+ : Unit = {
+ val status = taskManagerMetricGroup
.addGroup("Status")
+
+ instantiateNetworkMetrics(status.addGroup("Network"), network)
+
+ val jvm = status
.addGroup("JVM")
instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
@@ -2369,6 +2376,18 @@ object TaskManager {
instantiateCPUMetrics(jvm.addGroup("CPU"))
}
+ private def instantiateNetworkMetrics(
+ metrics: MetricGroup,
+ network: NetworkEnvironment)
+ : Unit = {
+ metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
+ override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
+ })
+ metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
+ override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
+ })
+ }
+
private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
val mxBean = ManagementFactory.getClassLoadingMXBean