You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:43 UTC
[3/7] flink git commit: [FLINK-7876] Register TaskManagerMetricGroup
under ResourceID
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
new file mode 100644
index 0000000..407fa8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
+ * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
+ */
+public class MetricRegistryImpl implements MetricRegistry {
+ static final Logger LOG = LoggerFactory.getLogger(MetricRegistryImpl.class);
+
+ private final Object lock = new Object();
+
+ private List<MetricReporter> reporters;
+ private ScheduledExecutorService executor;
+
+ @Nullable
+ private ActorRef queryService;
+
+ @Nullable
+ private String metricQueryServicePath;
+
+ private ViewUpdater viewUpdater;
+
+ private final ScopeFormats scopeFormats;
+ private final char globalDelimiter;
+ private final List<Character> delimiters = new ArrayList<>();
+
+ /**
+ * Creates a new MetricRegistry and starts the configured reporter.
+ */
+ public MetricRegistryImpl(MetricRegistryConfiguration config) {
+ this.scopeFormats = config.getScopeFormats();
+ this.globalDelimiter = config.getDelimiter();
+
+ // second, instantiate any custom configured reporters
+ this.reporters = new ArrayList<>();
+
+ List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
+
+ this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
+
+ this.queryService = null;
+ this.metricQueryServicePath = null;
+
+ if (reporterConfigurations.isEmpty()) {
+ // no reporters defined
+ // by default, don't report anything
+ LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
+ } else {
+ // we have some reporters so
+ for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
+ String namedReporter = reporterConfiguration.f0;
+ Configuration reporterConfig = reporterConfiguration.f1;
+
+ final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+ if (className == null) {
+ LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
+ continue;
+ }
+
+ try {
+ String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
+ TimeUnit timeunit = TimeUnit.SECONDS;
+ long period = 10;
+
+ if (configuredPeriod != null) {
+ try {
+ String[] interval = configuredPeriod.split(" ");
+ period = Long.parseLong(interval[0]);
+ timeunit = TimeUnit.valueOf(interval[1]);
+ }
+ catch (Exception e) {
+ LOG.error("Cannot parse report interval from config: " + configuredPeriod +
+ " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+ "Using default reporting interval.");
+ }
+ }
+
+ Class<?> reporterClass = Class.forName(className);
+ MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
+
+ MetricConfig metricConfig = new MetricConfig();
+ reporterConfig.addAllToProperties(metricConfig);
+ LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
+ reporterInstance.open(metricConfig);
+
+ if (reporterInstance instanceof Scheduled) {
+ LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
+
+ executor.scheduleWithFixedDelay(
+ new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
+ } else {
+ LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
+ }
+ reporters.add(reporterInstance);
+
+ String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
+ if (delimiterForReporter.length() != 1) {
+ LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
+ delimiterForReporter = String.valueOf(globalDelimiter);
+ }
+ this.delimiters.add(delimiterForReporter.charAt(0));
+ }
+ catch (Throwable t) {
+ LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initializes the MetricQueryService.
+ *
+ * @param actorSystem ActorSystem to create the MetricQueryService on
+ * @param resourceID resource ID used to disambiguate the actor name
+ */
+ public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
+ synchronized (lock) {
+ Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
+
+ try {
+ queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+ metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
+ } catch (Exception e) {
+ LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
+ }
+ }
+ }
+
+ /**
+ * Returns the address under which the {@link MetricQueryService} is reachable.
+ *
+ * @return address of the metric query service
+ */
+ @Nullable
+ public String getMetricQueryServicePath() {
+ return metricQueryServicePath;
+ }
+
+ @Override
+ public char getDelimiter() {
+ return this.globalDelimiter;
+ }
+
+ @Override
+ public char getDelimiter(int reporterIndex) {
+ try {
+ return delimiters.get(reporterIndex);
+ } catch (IndexOutOfBoundsException e) {
+ LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", reporterIndex);
+ return this.globalDelimiter;
+ }
+ }
+
+ @Override
+ public int getNumberReporters() {
+ return reporters.size();
+ }
+
+ public List<MetricReporter> getReporters() {
+ return reporters;
+ }
+
+ /**
+ * Returns whether this registry has been shutdown.
+ *
+ * @return true, if this registry was shutdown, otherwise false
+ */
+ public boolean isShutdown() {
+ synchronized (lock) {
+ return reporters == null && executor.isShutdown();
+ }
+ }
+
+ /**
+ * Shuts down this registry and the associated {@link MetricReporter}.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ Future<Boolean> stopFuture = null;
+ FiniteDuration stopTimeout = null;
+
+ if (queryService != null) {
+ stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
+ stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
+ }
+
+ if (reporters != null) {
+ for (MetricReporter reporter : reporters) {
+ try {
+ reporter.close();
+ } catch (Throwable t) {
+ LOG.warn("Metrics reporter did not shut down cleanly", t);
+ }
+ }
+ reporters = null;
+ }
+ shutdownExecutor();
+
+ if (stopFuture != null) {
+ boolean stopped = false;
+
+ try {
+ stopped = Await.result(stopFuture, stopTimeout);
+ } catch (Exception e) {
+ LOG.warn("Query actor did not properly stop.", e);
+ }
+
+ if (!stopped) {
+ // the query actor did not stop in time, let's kill him
+ queryService.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }
+ }
+
+ private void shutdownExecutor() {
+ if (executor != null) {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ @Override
+ public ScopeFormats getScopeFormats() {
+ return scopeFormats;
+ }
+
+ // ------------------------------------------------------------------------
+ // Metrics (de)registration
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+ synchronized (lock) {
+ if (isShutdown()) {
+ LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
+ } else {
+ if (reporters != null) {
+ for (int i = 0; i < reporters.size(); i++) {
+ MetricReporter reporter = reporters.get(i);
+ try {
+ if (reporter != null) {
+ FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+ reporter.notifyOfAddedMetric(metric, metricName, front);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while registering metric.", e);
+ }
+ }
+ }
+ try {
+ if (queryService != null) {
+ MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while registering metric.", e);
+ }
+ try {
+ if (metric instanceof View) {
+ if (viewUpdater == null) {
+ viewUpdater = new ViewUpdater(executor);
+ }
+ viewUpdater.notifyOfAddedView((View) metric);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while registering metric.", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+ synchronized (lock) {
+ if (isShutdown()) {
+ LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
+ } else {
+ if (reporters != null) {
+ for (int i = 0; i < reporters.size(); i++) {
+ try {
+ MetricReporter reporter = reporters.get(i);
+ if (reporter != null) {
+ FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+ reporter.notifyOfRemovedMetric(metric, metricName, front);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while registering metric.", e);
+ }
+ }
+ }
+ try {
+ if (queryService != null) {
+ MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while registering metric.", e);
+ }
+ try {
+ if (metric instanceof View) {
+ if (viewUpdater != null) {
+ viewUpdater.notifyOfRemovedView((View) metric);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while registering metric.", e);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ @Nullable
+ public ActorRef getQueryService() {
+ return queryService;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This task is explicitly a static class, so that it does not hold any references to the enclosing
+ * MetricsRegistry instance.
+ *
+ * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance
+ * may become garbage-collectible, whereas with an anonymous inner class, the timer thread
+ * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
+ * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,
+ * which acts as a fail-safe to stop the timer thread and prevents resource leaks.
+ */
+ private static final class ReporterTask extends TimerTask {
+
+ private final Scheduled reporter;
+
+ private ReporterTask(Scheduled reporter) {
+ this.reporter = reporter;
+ }
+
+ @Override
+ public void run() {
+ try {
+ reporter.report();
+ } catch (Throwable t) {
+ LOG.warn("Error while reporting metrics", t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index ab59977..66eace5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -106,7 +106,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
this.registry = checkNotNull(registry);
this.scopeComponents = checkNotNull(scope);
this.parent = parent;
- this.scopeStrings = new String[registry.getReporters() == null ? 0 : registry.getReporters().size()];
+ this.scopeStrings = new String[registry.getNumberReporters()];
}
public Map<String, String> getAllVariables() {
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index dd352bb..d4248ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -68,7 +68,7 @@ public class MiniCluster {
private final MiniClusterConfiguration miniClusterConfiguration;
@GuardedBy("lock")
- private MetricRegistry metricRegistry;
+ private MetricRegistryImpl metricRegistry;
@GuardedBy("lock")
private RpcService commonRpcService;
@@ -464,8 +464,8 @@ public class MiniCluster {
*
* @param config The configuration of the mini cluster
*/
- protected MetricRegistry createMetricRegistry(Configuration config) {
- return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ protected MetricRegistryImpl createMetricRegistry(Configuration config) {
+ return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
}
/**
@@ -502,7 +502,7 @@ public class MiniCluster {
Configuration configuration,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
int numResourceManagers,
RpcService[] resourceManagerRpcServices) throws Exception {
@@ -528,7 +528,7 @@ public class MiniCluster {
protected TaskExecutor[] startTaskManagers(
Configuration configuration,
HighAvailabilityServices haServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
int numTaskManagers,
RpcService[] taskManagerRpcServices) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 60d9a66..ca042b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
@@ -75,7 +75,7 @@ public class MiniClusterJobDispatcher {
private final JobManagerServices jobManagerServices;
/** Registry for all metrics in the mini cluster */
- private final MetricRegistry metricRegistry;
+ private final MetricRegistryImpl metricRegistry;
/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
private final int numJobManagers;
@@ -104,7 +104,7 @@ public class MiniClusterJobDispatcher {
HighAvailabilityServices haServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry) throws Exception {
+ MetricRegistryImpl metricRegistry) throws Exception {
this(
config,
haServices,
@@ -132,7 +132,7 @@ public class MiniClusterJobDispatcher {
HighAvailabilityServices haServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
int numJobManagers,
RpcService[] rpcServices) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index cbefe5a..90fb115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.minicluster;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,18 +28,25 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
/**
* Mini cluster to run the old JobManager code without embedded high availability services. This
* class has been implemented because the normal {@link FlinkMiniCluster} has been changed to use
@@ -63,6 +67,8 @@ public class StandaloneMiniCluster {
private final HighAvailabilityServices highAvailabilityServices;
+ private final MetricRegistryImpl metricRegistry;
+
private final FiniteDuration timeout;
private final int port;
@@ -86,21 +92,28 @@ public class StandaloneMiniCluster {
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+ metricRegistry = new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(configuration));
+
JobManager.startJobManagerActors(
configuration,
actorSystem,
scheduledExecutorService,
scheduledExecutorService,
highAvailabilityServices,
+ metricRegistry,
Option.empty(),
JobManager.class,
MemoryArchivist.class);
+ final ResourceID taskManagerResourceId = ResourceID.generate();
+
ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
configuration,
- ResourceID.generate(),
+ taskManagerResourceId,
actorSystem,
highAvailabilityServices,
+ metricRegistry,
LOCAL_HOSTNAME,
Option.<String>empty(),
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1d4d4f3..98b80c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -118,7 +118,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
/** Registry to use for metrics. */
- private final MetricRegistry metricRegistry;
+ private final MetricRegistryImpl metricRegistry;
/** Fatal error handler. */
private final FatalErrorHandler fatalErrorHandler;
@@ -140,7 +140,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
@@ -498,8 +498,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
@Override
- public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
- final ArrayList<Tuple2<InstanceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
+ public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+ final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
final ResourceID tmResourceId = workerRegistrationEntry.getKey();
@@ -508,7 +508,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString();
- metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), tmMetricQueryServicePath));
+ metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath));
}
return CompletableFuture.completedFuture(metricQueryServicePaths);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 9eacb4b..cc2766b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -176,5 +176,5 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
* @param timeout for the asynchronous operation
* @return Future containing the collection of instance ids and the corresponding metric query service path
*/
- CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+ CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index caa3ba0..361bdd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;
@@ -55,7 +55,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
final RpcService rpcService,
final HighAvailabilityServices highAvailabilityServices,
final HeartbeatServices heartbeatServices,
- final MetricRegistry metricRegistry) throws Exception {
+ final MetricRegistryImpl metricRegistry) throws Exception {
Preconditions.checkNotNull(resourceId);
Preconditions.checkNotNull(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 624f31d..d2b1205 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -45,7 +45,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index cf5bfcb..4d6ccd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -31,8 +31,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
@@ -166,13 +166,13 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
//fetch TaskManager logs if no other process is currently doing it
if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) {
try {
- InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
- CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+ ResourceID resourceId = new ResourceID(new String(StringUtils.hexStringToByte(taskManagerID)));
+ CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout);
CompletableFuture<TransientBlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
(Optional<Instance> optTMInstance) -> {
Instance taskManagerInstance = optTMInstance.orElseThrow(
- () -> new CompletionException(new FlinkException("Could not find instance with " + instanceID + '.')));
+ () -> new CompletionException(new FlinkException("Could not find instance with " + resourceId + '.')));
switch (fileMode) {
case LOG:
return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index ad2ee1b..84c6e41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -19,19 +19,20 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -74,8 +75,16 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
// return them in an array. This avoids unnecessary code complexity.
// If only one task manager is requested, we only fetch one task manager metrics.
if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
- InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
- CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+ final String unescapedString;
+
+ try {
+ unescapedString = URLDecoder.decode(pathParams.get(TASK_MANAGER_ID_KEY), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ return FutureUtils.completedExceptionally(new FlinkException("Could not decode task manager id: " + pathParams.get(TASK_MANAGER_ID_KEY) + '.', e));
+ }
+
+ ResourceID resourceId = new ResourceID(unescapedString);
+ CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout);
return tmInstanceFuture.thenApplyAsync(
(Optional<Instance> optTaskManager) -> {
@@ -116,7 +125,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
for (Instance instance : instances) {
gen.writeStartObject();
- gen.writeStringField("id", instance.getId().toString());
+ gen.writeStringField("id", instance.getTaskManagerID().getResourceIdString());
gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
@@ -131,7 +140,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
fetcher.update();
- MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+ MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getTaskManagerID().getResourceIdString());
if (metrics != null) {
gen.writeObjectFieldStart("metrics");
long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 1bfb9f2..e71a1d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
@@ -145,20 +145,20 @@ public class MetricFetcher<T extends RestfulGateway> {
// TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
// TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
// TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
- CompletableFuture<Collection<Tuple2<InstanceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
+ CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
.requestTaskManagerMetricQueryServicePaths(timeout);
taskManagerQueryServicePathsFuture.whenCompleteAsync(
- (Collection<Tuple2<InstanceID, String>> queryServicePaths, Throwable throwable) -> {
+ (Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> {
if (throwable != null) {
LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
} else {
List<String> taskManagersToRetain = queryServicePaths
.stream()
.map(
- (Tuple2<InstanceID, String> tuple) -> {
+ (Tuple2<ResourceID, String> tuple) -> {
retrieveAndQueryMetrics(tuple.f1);
- return tuple.f0.toString();
+ return tuple.f0.getResourceIdString();
}
).collect(Collectors.toList());
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cd67705..a956111 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -135,7 +135,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final NetworkEnvironment networkEnvironment;
/** The metric registry in the task manager */
- private final MetricRegistry metricRegistry;
+ private final MetricRegistryImpl metricRegistry;
/** The heartbeat manager for job manager in the task manager */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
@@ -179,7 +179,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
NetworkEnvironment networkEnvironment,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
TaskManagerMetricGroup taskManagerMetricGroup,
BroadcastVariableManager broadcastVariableManager,
FileCache fileCache,
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 782ab07..5a69bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -29,22 +29,19 @@ import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
@@ -86,7 +83,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
private final HighAvailabilityServices highAvailabilityServices;
- private final MetricRegistry metricRegistry;
+ private final MetricRegistryImpl metricRegistry;
/** Executor used to run future callbacks */
private final ExecutorService executor;
@@ -112,7 +109,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
- metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+ metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
metricRegistry.startQueryService(actorSystem, resourceId);
@@ -250,7 +247,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
boolean localCommunicationOnly,
FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -269,18 +266,11 @@ public class TaskManagerRunner implements FatalErrorHandler {
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
- resourceID);
+ resourceID,
+ metricRegistry);
TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
- TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
- metricRegistry,
- taskManagerServices.getTaskManagerLocation().getHostname(),
- resourceID.toString());
-
- // Initialize the TM metrics
- TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
-
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
@@ -291,7 +281,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
highAvailabilityServices,
heartbeatServices,
metricRegistry,
- taskManagerMetricGroup,
+ taskManagerServices.getTaskManagerMetricGroup(),
taskManagerServices.getBroadcastVariableManager(),
taskManagerServices.getFileCache(),
taskManagerServices.getTaskSlotTable(),
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 2baf644..85e62c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
@@ -62,7 +63,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
- * {@link NetworkEnvironment} and the {@link MetricRegistry}.
+ * {@link NetworkEnvironment} and the {@link MetricRegistryImpl}.
*/
public class TaskManagerServices {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
@@ -72,7 +73,6 @@ public class TaskManagerServices {
private final MemoryManager memoryManager;
private final IOManager ioManager;
private final NetworkEnvironment networkEnvironment;
- private final MetricRegistry metricRegistry;
private final TaskManagerMetricGroup taskManagerMetricGroup;
private final BroadcastVariableManager broadcastVariableManager;
private final FileCache fileCache;
@@ -85,7 +85,6 @@ public class TaskManagerServices {
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment networkEnvironment,
- MetricRegistry metricRegistry,
TaskManagerMetricGroup taskManagerMetricGroup,
BroadcastVariableManager broadcastVariableManager,
FileCache fileCache,
@@ -97,7 +96,6 @@ public class TaskManagerServices {
this.memoryManager = Preconditions.checkNotNull(memoryManager);
this.ioManager = Preconditions.checkNotNull(ioManager);
this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
- this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup);
this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
this.fileCache = Preconditions.checkNotNull(fileCache);
@@ -126,10 +124,6 @@ public class TaskManagerServices {
return taskManagerLocation;
}
- public MetricRegistry getMetricRegistry() {
- return metricRegistry;
- }
-
public TaskManagerMetricGroup getTaskManagerMetricGroup() {
return taskManagerMetricGroup;
}
@@ -163,12 +157,14 @@ public class TaskManagerServices {
*
* @param resourceID resource ID of the task manager
* @param taskManagerServicesConfiguration task manager configuration
+ * @param metricRegistry to register the TaskManagerMetricGroup
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
- ResourceID resourceID) throws Exception {
+ ResourceID resourceID,
+ MetricRegistry metricRegistry) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -187,9 +183,6 @@ public class TaskManagerServices {
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
- final MetricRegistry metricRegistry = new MetricRegistry(
- taskManagerServicesConfiguration.getMetricRegistryConfiguration());
-
final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
metricRegistry,
taskManagerLocation.getHostname(),
@@ -223,7 +216,6 @@ public class TaskManagerServices {
memoryManager,
ioManager,
network,
- metricRegistry,
taskManagerMetricGroup,
broadcastVariableManager,
fileCache,
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bfd37bc..990fb22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.NetUtils;
@@ -72,8 +71,6 @@ public class TaskManagerServicesConfiguration {
private final float memoryFraction;
- private final MetricRegistryConfiguration metricRegistryConfiguration;
-
private final long timerServiceShutdownTimeout;
public TaskManagerServicesConfiguration(
@@ -85,7 +82,6 @@ public class TaskManagerServicesConfiguration {
long configuredMemory,
boolean preAllocateMemory,
float memoryFraction,
- MetricRegistryConfiguration metricRegistryConfiguration,
long timerServiceShutdownTimeout) {
this.taskManagerAddress = checkNotNull(taskManagerAddress);
@@ -98,8 +94,6 @@ public class TaskManagerServicesConfiguration {
this.preAllocateMemory = preAllocateMemory;
this.memoryFraction = memoryFraction;
- this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration);
-
checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
"service shutdown timeout must be greater or equal to 0.");
this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
@@ -148,10 +142,6 @@ public class TaskManagerServicesConfiguration {
return preAllocateMemory;
}
- public MetricRegistryConfiguration getMetricRegistryConfiguration() {
- return metricRegistryConfiguration;
- }
-
public long getTimerServiceShutdownTimeout() {
return timerServiceShutdownTimeout;
}
@@ -211,8 +201,6 @@ public class TaskManagerServicesConfiguration {
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
- final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
-
long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
return new TaskManagerServicesConfiguration(
@@ -224,7 +212,6 @@ public class TaskManagerServicesConfiguration {
configuredMemory,
preAllocateMemory,
memoryFraction,
- metricRegistryConfiguration,
timerServiceShutdownTimeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index d871b06..331e96b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -95,5 +95,5 @@ public interface RestfulGateway extends RpcGateway {
* @param timeout for the asynchronous operation
* @return Future containing the collection of instance ids and the corresponding metric query service path
*/
- CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+ CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 74ef1de..1c573c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.messages.Acknowledge
import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -74,7 +75,7 @@ abstract class ContaineredJobManager(
submittedJobGraphs : SubmittedJobGraphStore,
checkpointRecoveryFactory : CheckpointRecoveryFactory,
jobRecoveryTimeout: FiniteDuration,
- metricsRegistry: Option[FlinkMetricRegistry],
+ jobManagerMetricGroup: JobManagerMetricGroup,
optRestAddress: Option[String])
extends JobManager(
flinkConfiguration,
@@ -91,7 +92,7 @@ abstract class ContaineredJobManager(
submittedJobGraphs,
checkpointRecoveryFactory,
jobRecoveryTimeout,
- metricsRegistry,
+ jobManagerMetricGroup,
optRestAddress) {
val jobPollingInterval: FiniteDuration
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0435046..d40a0fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -34,7 +34,6 @@ import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration._
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
import org.apache.flink.metrics.{Gauge, MetricGroup}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -50,10 +49,10 @@ import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServic
import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -66,20 +65,18 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.Disconnect
import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
+import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.util.MetricUtils
-import org.apache.flink.runtime.net.SSLUtils
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.taskexecutor.TaskExecutor
import org.apache.flink.runtime.taskmanager.TaskManager
@@ -137,7 +134,7 @@ class JobManager(
protected val submittedJobGraphs : SubmittedJobGraphStore,
protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
protected val jobRecoveryTimeout: FiniteDuration,
- protected val metricsRegistry: Option[FlinkMetricRegistry],
+ protected val jobManagerMetricGroup: JobManagerMetricGroup,
protected val optRestAddress: Option[String])
extends FlinkActor
with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
@@ -154,16 +151,6 @@ class JobManager(
var leaderSessionID: Option[UUID] = None
- protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
- case Some(registry) =>
- val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS)
- Option(new JobManagerMetricGroup(
- registry, NetUtils.unresolvedHostToNormalizedString(host)))
- case None =>
- log.warn("Could not instantiate JobManager metrics.")
- None
- }
-
/** Futures which have to be completed before terminating the job manager */
var futuresToComplete: Option[Seq[Future[Unit]]] = None
@@ -205,12 +192,7 @@ class JobManager(
throw new RuntimeException("Could not start the submitted job graphs service.", e)
}
- jobManagerMetricGroup match {
- case Some(group) =>
- instantiateMetrics(group)
- case None =>
- log.warn("Could not instantiate JobManager metric group.")
- }
+ instantiateMetrics(jobManagerMetricGroup)
}
override def postStop(): Unit = {
@@ -250,6 +232,8 @@ class JobManager(
archive ! decorateMessage(PoisonPill)
}
+ jobManagerMetricGroup.close()
+
instanceManager.shutdown()
scheduler.shutdown()
libraryCacheManager.shutdown()
@@ -260,13 +244,6 @@ class JobManager(
case e: IOException => log.error("Could not properly shutdown the blob server.", e)
}
- // failsafe shutdown of the metrics registry
- try {
- metricsRegistry.foreach(_.shutdown())
- } catch {
- case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
- }
-
log.debug(s"Job manager ${self.path} is completely stopped.")
}
@@ -1073,9 +1050,9 @@ class JobManager(
)
)
- case RequestTaskManagerInstance(instanceID) =>
+ case RequestTaskManagerInstance(resourceId) =>
sender ! decorateMessage(
- TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID)))
+ TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId)))
)
case Heartbeat(instanceID, accumulators) =>
@@ -1283,15 +1260,7 @@ class JobManager(
log.info(s"Using restart strategy $restartStrategy for $jobId.")
- val jobMetrics = jobManagerMetricGroup match {
- case Some(group) =>
- group.addJob(jobGraph) match {
- case (jobGroup:Any) => jobGroup
- case null => new UnregisteredMetricsGroup()
- }
- case None =>
- new UnregisteredMetricsGroup()
- }
+ val jobMetrics = jobManagerMetricGroup.addJob(jobGraph)
val numSlots = scheduler.getTotalNumberOfSlots()
@@ -1791,7 +1760,7 @@ class JobManager(
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID)
- jobManagerMetricGroup.foreach(_.removeJob(jobID))
+ jobManagerMetricGroup.removeJob(jobID)
futureOption
}
@@ -2042,7 +2011,12 @@ object JobManager {
val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
ioExecutor,
- AddressResolution.NO_ADDRESS_RESOLUTION);
+ AddressResolution.NO_ADDRESS_RESOLUTION)
+
+ val metricRegistry = new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(configuration))
+
+ metricRegistry.startQueryService(jobManagerSystem, null)
val (_, _, webMonitorOption, _) = try {
startJobManagerActors(
@@ -2053,6 +2027,7 @@ object JobManager {
futureExecutor,
ioExecutor,
highAvailabilityServices,
+ metricRegistry,
classOf[JobManager],
classOf[MemoryArchivist],
Option(classOf[StandaloneResourceManager])
@@ -2085,6 +2060,13 @@ object JobManager {
LOG.warn("Could not properly stop the high availability services.", t)
}
+ try {
+ metricRegistry.shutdown()
+ } catch {
+ case t: Throwable =>
+ LOG.warn("Could not properly shut down the metric registry.", t)
+ }
+
FlinkExecutors.gracefulShutdown(
timeout.toMillis,
TimeUnit.MILLISECONDS,
@@ -2191,6 +2173,7 @@ object JobManager {
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry: FlinkMetricRegistry,
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist],
resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2231,6 +2214,7 @@ object JobManager {
futureExecutor,
ioExecutor,
highAvailabilityServices,
+ metricRegistry,
webMonitor.map(_.getRestAddress),
jobManagerClass,
archiveClass)
@@ -2250,11 +2234,14 @@ object JobManager {
if (executionMode == JobManagerMode.LOCAL) {
LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
+ val resourceId = ResourceID.generate()
+
val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
configuration,
- ResourceID.generate(),
+ resourceId,
jobManagerSystem,
highAvailabilityServices,
+ metricRegistry,
externalHostname,
Some(TaskExecutor.TASK_MANAGER_NAME),
localTaskManagerCommunication = true,
@@ -2433,7 +2420,8 @@ object JobManager {
configuration: Configuration,
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
- blobStore: BlobStore) :
+ blobStore: BlobStore,
+ metricRegistry: FlinkMetricRegistry) :
(InstanceManager,
FlinkScheduler,
BlobServer,
@@ -2443,7 +2431,7 @@ object JobManager {
Int, // number of archived jobs
Option[Path], // archive path
FiniteDuration, // timeout for job recovery
- Option[FlinkMetricRegistry]
+ JobManagerMetricGroup
) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -2525,12 +2513,9 @@ object JobManager {
}
}
- val metricRegistry = try {
- Option(new FlinkMetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)))
- } catch {
- case _: Exception =>
- None
- }
+ val jobManagerMetricGroup = new JobManagerMetricGroup(
+ metricRegistry,
+ configuration.getString(JobManagerOptions.ADDRESS))
(instanceManager,
scheduler,
@@ -2541,7 +2526,7 @@ object JobManager {
archiveCount,
archivePath,
jobRecoveryTimeout,
- metricRegistry)
+ jobManagerMetricGroup)
}
/**
@@ -2564,6 +2549,7 @@ object JobManager {
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry: FlinkMetricRegistry,
optRestAddress: Option[String],
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist])
@@ -2575,6 +2561,7 @@ object JobManager {
futureExecutor,
ioExecutor,
highAvailabilityServices,
+ metricRegistry,
optRestAddress,
Some(JobMaster.JOB_MANAGER_NAME),
Some(JobMaster.ARCHIVE_NAME),
@@ -2606,6 +2593,7 @@ object JobManager {
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry: FlinkMetricRegistry,
optRestAddress: Option[String],
jobManagerActorName: Option[String],
archiveActorName: Option[String],
@@ -2622,11 +2610,12 @@ object JobManager {
archiveCount,
archivePath,
jobRecoveryTimeout,
- metricsRegistry) = createJobManagerComponents(
+ jobManagerMetricGroup) = createJobManagerComponents(
configuration,
futureExecutor,
ioExecutor,
- highAvailabilityServices.createBlobStore())
+ highAvailabilityServices.createBlobStore(),
+ metricRegistry)
val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
@@ -2653,7 +2642,7 @@ object JobManager {
highAvailabilityServices.getSubmittedJobGraphStore(),
highAvailabilityServices.getCheckpointRecoveryFactory(),
jobRecoveryTimeout,
- metricsRegistry,
+ jobManagerMetricGroup,
optRestAddress)
val jobManager: ActorRef = jobManagerActorName match {
@@ -2661,12 +2650,6 @@ object JobManager {
case None => actorSystem.actorOf(jobManagerProps)
}
- metricsRegistry match {
- case Some(registry) =>
- registry.startQueryService(actorSystem, null)
- case None =>
- }
-
(jobManager, archive)
}
@@ -2693,7 +2676,7 @@ object JobManager {
submittedJobGraphStore: SubmittedJobGraphStore,
checkpointRecoveryFactory: CheckpointRecoveryFactory,
jobRecoveryTimeout: FiniteDuration,
- metricsRegistry: Option[FlinkMetricRegistry],
+ jobManagerMetricGroup: JobManagerMetricGroup,
optRestAddress: Option[String]): Props = {
Props(
@@ -2712,7 +2695,7 @@ object JobManager {
submittedJobGraphStore,
checkpointRecoveryFactory,
jobRecoveryTimeout,
- metricsRegistry,
+ jobManagerMetricGroup,
optRestAddress)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3e896ca..5c19c7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -24,8 +24,9 @@ import java.util.UUID
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.blob.{PermanentBlobKey}
+import org.apache.flink.runtime.blob.PermanentBlobKey
import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult}
+import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.{Instance, InstanceID}
import org.apache.flink.runtime.io.network.partition.ResultPartitionID
@@ -419,9 +420,9 @@ object JobManagerMessages {
/**
* Requests the [[Instance]] object of the task manager with the given instance ID
*
- * @param instanceID Instance ID of the task manager
+ * @param resourceId identifying the TaskManager which shall be retrieved
*/
- case class RequestTaskManagerInstance(instanceID: InstanceID)
+ case class RequestTaskManagerInstance(resourceId: ResourceID)
/**
* Returns the [[Instance]] object of the requested task manager. This is in response to
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c152f4a..689d98f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl}
import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -121,6 +122,9 @@ abstract class FlinkMiniCluster(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("mini-cluster-io"))
+ protected val metricRegistry = new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(originalConfiguration))
+
def this(configuration: Configuration, useSingleActorSystem: Boolean) {
this(
configuration,
@@ -325,6 +329,10 @@ abstract class FlinkMiniCluster(
lazy val singleActorSystem = startJobManagerActorSystem(0)
+ if (originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
+ metricRegistry.startQueryService(singleActorSystem, null)
+ }
+
val (jmActorSystems, jmActors) =
(for(i <- 0 until numJobManagers) yield {
val actorSystem = if(useSingleActorSystem) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e22230e..e9bdb2a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.minicluster
import java.net.InetAddress
+import java.util.UUID
import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.{ActorRef, ActorSystem, Props}
@@ -46,7 +47,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages
import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl}
import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
import org.apache.flink.runtime.util.EnvironmentInformation
@@ -83,6 +85,12 @@ class LocalFlinkMiniCluster(
def this(userConfiguration: Configuration) = this(userConfiguration, true)
+ override def startInternalShutdown() {
+ metricRegistry.shutdown()
+
+ super.startInternalShutdown()
+ }
+
// --------------------------------------------------------------------------
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
@@ -137,23 +145,20 @@ class LocalFlinkMiniCluster(
}
val (instanceManager,
- scheduler,
- blobServer,
- libraryCacheManager,
- restartStrategyFactory,
- timeout,
- archiveCount,
- archivePath,
- jobRecoveryTimeout,
- metricsRegistry) = JobManager.createJobManagerComponents(
+ scheduler,
+ blobServer,
+ libraryCacheManager,
+ restartStrategyFactory,
+ timeout,
+ archiveCount,
+ archivePath,
+ jobRecoveryTimeout,
+ jobManagerMetricGroup) = JobManager.createJobManagerComponents(
config,
futureExecutor,
ioExecutor,
- highAvailabilityServices.createBlobStore())
-
- if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
- metricsRegistry.get.startQueryService(system, null)
- }
+ highAvailabilityServices.createBlobStore(),
+ metricRegistry)
val archive = system.actorOf(
getArchiveProps(
@@ -180,7 +185,7 @@ class LocalFlinkMiniCluster(
highAvailabilityServices.getSubmittedJobGraphStore(),
highAvailabilityServices.getCheckpointRecoveryFactory(),
jobRecoveryTimeout,
- metricsRegistry,
+ jobManagerMetricGroup,
optRestAddress),
jobManagerName)
}
@@ -248,9 +253,8 @@ class LocalFlinkMiniCluster(
val taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
- resourceID)
-
- val metricRegistry = taskManagerServices.getMetricRegistry()
+ resourceID,
+ metricRegistry)
val props = getTaskManagerProps(
taskManagerClass,
@@ -260,7 +264,7 @@ class LocalFlinkMiniCluster(
taskManagerServices.getMemoryManager(),
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment,
- metricRegistry)
+ taskManagerServices.getTaskManagerMetricGroup)
if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
metricRegistry.startQueryService(system, resourceID)
@@ -296,7 +300,7 @@ class LocalFlinkMiniCluster(
submittedJobGraphStore: SubmittedJobGraphStore,
checkpointRecoveryFactory: CheckpointRecoveryFactory,
jobRecoveryTimeout: FiniteDuration,
- metricsRegistry: Option[MetricRegistry],
+ jobManagerMetricGroup: JobManagerMetricGroup,
optRestAddress: Option[String])
: Props = {
@@ -316,7 +320,7 @@ class LocalFlinkMiniCluster(
submittedJobGraphStore,
checkpointRecoveryFactory,
jobRecoveryTimeout,
- metricsRegistry,
+ jobManagerMetricGroup,
optRestAddress)
}
@@ -328,7 +332,7 @@ class LocalFlinkMiniCluster(
memoryManager: MemoryManager,
ioManager: IOManager,
networkEnvironment: NetworkEnvironment,
- metricsRegistry: MetricRegistry): Props = {
+ taskManagerMetricGroup: TaskManagerMetricGroup): Props = {
TaskManager.getTaskManagerProps(
taskManagerClass,
@@ -339,7 +343,7 @@ class LocalFlinkMiniCluster(
ioManager,
networkEnvironment,
highAvailabilityServices,
- metricsRegistry)
+ taskManagerMetricGroup)
}
def getResourceManagerProps(
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index cc01a8d..f209dac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, FileInputStream, IOException}
import java.lang.management.ManagementFactory
import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket}
import java.util
-import java.util.concurrent.{Callable, TimeUnit}
+import java.util.concurrent.{Callable, TimeUnit, TimeoutException}
import java.util.{Collections, UUID}
import _root_.akka.actor._
@@ -63,8 +63,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
import org.apache.flink.runtime.messages.{Acknowledge, StackTraceSampleResponse}
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
-import org.apache.flink.runtime.metrics.util.MetricUtils
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
@@ -127,7 +126,7 @@ class TaskManager(
protected val network: NetworkEnvironment,
protected val numberOfSlots: Int,
protected val highAvailabilityServices: HighAvailabilityServices,
- protected val metricsRegistry: FlinkMetricRegistry)
+ protected val taskManagerMetricGroup: TaskManagerMetricGroup)
extends FlinkActor
with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
with LogMessages // Mixin order is important: first we want to support message logging
@@ -154,8 +153,6 @@ class TaskManager(
getJobManagerLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID)
- private var taskManagerMetricGroup : TaskManagerMetricGroup = _
-
/** Actors which want to be notified once this task manager has been
* registered at the job manager */
private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
@@ -258,13 +255,8 @@ class TaskManager(
} catch {
case t: Exception => log.error("FileCache did not shutdown properly.", t)
}
-
- // failsafe shutdown of the metrics registry
- try {
- metricsRegistry.shutdown()
- } catch {
- case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
- }
+
+ taskManagerMetricGroup.close()
log.info(s"Task manager ${self.path} is completely shut down.")
}
@@ -980,12 +972,6 @@ class TaskManager(
throw new RuntimeException(message, e)
}
- taskManagerMetricGroup =
- new TaskManagerMetricGroup(metricsRegistry, location.getHostname, id.toString)
-
- MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup)
- MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network)
-
// watch job manager to detect when it dies
context.watch(jobManager)
@@ -1832,15 +1818,22 @@ object TaskManager {
actorSystemPort,
LOG.logger)
+ val metricRegistry = new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(configuration))
+
+ metricRegistry.startQueryService(taskManagerSystem, resourceID)
+
// start all the TaskManager services (network stack, library cache, ...)
// and the TaskManager actor
try {
+
LOG.info("Starting TaskManager actor")
val taskManager = startTaskManagerComponentsAndActor(
configuration,
resourceID,
taskManagerSystem,
highAvailabilityServices,
+ metricRegistry,
taskManagerHostname,
Some(TaskExecutor.TASK_MANAGER_NAME),
localTaskManagerCommunication = false,
@@ -1893,6 +1886,9 @@ object TaskManager {
}
throw t
}
+
+ // shut down the metric query service
+ metricRegistry.shutdown()
}
/**
@@ -1984,6 +1980,7 @@ object TaskManager {
resourceID: ResourceID,
actorSystem: ActorSystem,
highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry: FlinkMetricRegistry,
taskManagerHostname: String,
taskManagerActorName: Option[String],
localTaskManagerCommunication: Boolean,
@@ -1999,9 +1996,8 @@ object TaskManager {
val taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
- resourceID)
-
- val metricRegistry = taskManagerServices.getMetricRegistry()
+ resourceID,
+ metricRegistry)
// create the actor properties (which define the actor constructor parameters)
val tmProps = getTaskManagerProps(
@@ -2013,9 +2009,7 @@ object TaskManager {
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
highAvailabilityServices,
- metricRegistry)
-
- metricRegistry.startQueryService(actorSystem, resourceID)
+ taskManagerServices.getTaskManagerMetricGroup)
taskManagerActorName match {
case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -2032,7 +2026,7 @@ object TaskManager {
ioManager: IOManager,
networkEnvironment: NetworkEnvironment,
highAvailabilityServices: HighAvailabilityServices,
- metricsRegistry: FlinkMetricRegistry
+ taskManagerMetricGroup: TaskManagerMetricGroup
): Props = {
Props(
taskManagerClass,
@@ -2044,7 +2038,7 @@ object TaskManager {
networkEnvironment,
taskManagerConfig.getNumberSlots(),
highAvailabilityServices,
- metricsRegistry)
+ taskManagerMetricGroup)
}
// --------------------------------------------------------------------------