You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/10 13:03:18 UTC
[flink] 06/06: [FLINK-12488][metrics] Pass Status group to
NetworkEnvironment
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b3ae6dbb13a1ab7ca06ff5190b574b4a6d8cbe3f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 10 14:56:40 2019 +0200
[FLINK-12488][metrics] Pass Status group to NetworkEnvironment
---
.../java/org/apache/flink/runtime/metrics/util/MetricUtils.java | 5 +++--
.../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 8 +++++---
.../apache/flink/runtime/taskexecutor/TaskManagerServices.java | 4 ++--
3 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index c30ef62..4ce8f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.util;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
@@ -89,7 +90,7 @@ public class MetricUtils {
return jobManagerMetricGroup;
}
- public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
+ public static Tuple2<TaskManagerMetricGroup, MetricGroup> instantiateTaskManagerMetricGroup(
MetricRegistry metricRegistry,
String hostName,
ResourceID resourceID,
@@ -107,7 +108,7 @@ public class MetricUtils {
if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get());
}
- return taskManagerMetricGroup;
+ return Tuple2.of(taskManagerMetricGroup, statusGroup);
}
public static void instantiateStatusMetrics(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 3df99dd..4288573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -20,12 +20,14 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -358,7 +360,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
remoteAddress,
localCommunicationOnly);
- TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+ Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
TaskManagerLocation.getHostName(remoteAddress),
resourceID,
@@ -366,7 +368,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
- taskManagerMetricGroup,
+ taskManagerMetricGroup.f1,
resourceID,
rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
@@ -382,7 +384,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
highAvailabilityServices,
taskManagerServices,
heartbeatServices,
- taskManagerMetricGroup,
+ taskManagerMetricGroup.f0,
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index a3e9066..c1cd8dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -232,7 +232,7 @@ public class TaskManagerServices {
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
- TaskManagerMetricGroup taskManagerMetricGroup,
+ MetricGroup taskManagerMetricGroup,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,