You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 09:16:32 UTC

[flink] branch master updated: [FLINK-12488][metrics] Pass Status group to NetworkEnvironment

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 753a11e  [FLINK-12488][metrics] Pass Status group to NetworkEnvironment
753a11e is described below

commit 753a11ea2c31c2cf2a24d18c5442b133004acba0
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 10 14:56:40 2019 +0200

    [FLINK-12488][metrics] Pass Status group to NetworkEnvironment
---
 .../java/org/apache/flink/runtime/metrics/util/MetricUtils.java   | 5 +++--
 .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java  | 8 +++++---
 .../apache/flink/runtime/taskexecutor/TaskManagerServices.java    | 4 ++--
 3 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index c30ef62..4ce8f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
@@ -89,7 +90,7 @@ public class MetricUtils {
 		return jobManagerMetricGroup;
 	}
 
-	public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
+	public static Tuple2<TaskManagerMetricGroup, MetricGroup> instantiateTaskManagerMetricGroup(
 			MetricRegistry metricRegistry,
 			String hostName,
 			ResourceID resourceID,
@@ -107,7 +108,7 @@ public class MetricUtils {
 		if (systemResourceProbeInterval.isPresent()) {
 			instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get());
 		}
-		return taskManagerMetricGroup;
+		return Tuple2.of(taskManagerMetricGroup, statusGroup);
 	}
 
 	public static void instantiateStatusMetrics(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 3df99dd..4288573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -20,12 +20,14 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -358,7 +360,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 				remoteAddress,
 				localCommunicationOnly);
 
-		TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+		Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
 			metricRegistry,
 			TaskManagerLocation.getHostName(remoteAddress),
 			resourceID,
@@ -366,7 +368,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
-			taskManagerMetricGroup,
+			taskManagerMetricGroup.f1,
 			resourceID,
 			rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
 			EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
@@ -382,7 +384,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			highAvailabilityServices,
 			taskManagerServices,
 			heartbeatServices,
-			taskManagerMetricGroup,
+			taskManagerMetricGroup.f0,
 			metricQueryServiceAddress,
 			blobCacheService,
 			fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 395a207..e2ec942 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -232,7 +232,7 @@ public class TaskManagerServices {
 	 */
 	public static TaskManagerServices fromConfiguration(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
-			TaskManagerMetricGroup taskManagerMetricGroup,
+			MetricGroup taskManagerMetricGroup,
 			ResourceID resourceID,
 			Executor taskIOExecutor,
 			long freeHeapMemoryWithDefrag,