You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/23 14:13:53 UTC
flink git commit: [FLINK-6129] [metrics] Stop query actor of
MetricRegistry
Repository: flink
Updated Branches:
refs/heads/master 3e860b407 -> 8319a457d
[FLINK-6129] [metrics] Stop query actor of MetricRegistry
This PR properly shuts down the query actor of the MetricRegistry upon shut down.
Add locking to the MetricRegistry
This closes #3573.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8319a457
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8319a457
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8319a457
Branch: refs/heads/master
Commit: 8319a457d9adee310ef64905709c03ca2f2afd61
Parents: 3e860b4
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Mar 20 14:55:30 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 23 15:13:33 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/metrics/MetricRegistry.java | 157 +++++++++++++------
.../runtime/metrics/MetricRegistryTest.java | 34 ++++
2 files changed, 142 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8319a457/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index d3b21fc..9f46d47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.metrics;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -34,8 +37,12 @@ import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.List;
@@ -52,7 +59,9 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class MetricRegistry {
static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-
+
+ private final Object lock = new Object();
+
private List<MetricReporter> reporters;
private ScheduledExecutorService executor;
private ActorRef queryService;
@@ -150,10 +159,14 @@ public class MetricRegistry {
* @param resourceID resource ID used to disambiguate the actor name
*/
public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
- try {
- queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
- } catch (Exception e) {
- LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
+ synchronized (lock) {
+ Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
+
+ try {
+ queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+ } catch (Exception e) {
+ LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
+ }
}
}
@@ -191,24 +204,51 @@ public class MetricRegistry {
* @return true, if this registry was shutdown, otherwise false
*/
public boolean isShutdown() {
- return reporters == null && executor.isShutdown();
+ synchronized (lock) {
+ return reporters == null && executor.isShutdown();
+ }
}
/**
* Shuts down this registry and the associated {@link MetricReporter}.
*/
public void shutdown() {
- if (reporters != null) {
- for (MetricReporter reporter : reporters) {
+ synchronized (lock) {
+ Future<Boolean> stopFuture = null;
+ FiniteDuration stopTimeout = null;
+
+ if (queryService != null) {
+ stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
+ stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
+ }
+
+ if (reporters != null) {
+ for (MetricReporter reporter : reporters) {
+ try {
+ reporter.close();
+ } catch (Throwable t) {
+ LOG.warn("Metrics reporter did not shut down cleanly", t);
+ }
+ }
+ reporters = null;
+ }
+ shutdownExecutor();
+
+ if (stopFuture != null) {
+ boolean stopped = false;
+
try {
- reporter.close();
- } catch (Throwable t) {
- LOG.warn("Metrics reporter did not shut down cleanly", t);
+ stopped = Await.result(stopFuture, stopTimeout);
+ } catch (Exception e) {
+ LOG.warn("Query actor did not properly stop.", e);
+ }
+
+ if (!stopped) {
+ // the query actor did not stop in time, let's kill him
+ queryService.tell(Kill.getInstance(), ActorRef.noSender());
}
}
- reporters = null;
}
- shutdownExecutor();
}
private void shutdownExecutor() {
@@ -216,7 +256,7 @@ public class MetricRegistry {
executor.shutdown();
try {
- if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
@@ -241,27 +281,33 @@ public class MetricRegistry {
* @param group the group that contains the metric
*/
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
- try {
- if (reporters != null) {
- for (int i = 0; i < reporters.size(); i++) {
- MetricReporter reporter = reporters.get(i);
- if (reporter != null) {
- FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
- reporter.notifyOfAddedMetric(metric, metricName, front);
+ synchronized (lock) {
+ if (isShutdown()) {
+ LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
+ } else {
+ try {
+ if (reporters != null) {
+ for (int i = 0; i < reporters.size(); i++) {
+ MetricReporter reporter = reporters.get(i);
+ if (reporter != null) {
+ FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+ reporter.notifyOfAddedMetric(metric, metricName, front);
+ }
+ }
}
+ if (queryService != null) {
+ MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
+ }
+ if (metric instanceof View) {
+ if (viewUpdater == null) {
+ viewUpdater = new ViewUpdater(executor);
+ }
+ viewUpdater.notifyOfAddedView((View) metric);
+ }
+ } catch (Exception e) {
+ LOG.error("Error while registering metric.", e);
}
}
- if (queryService != null) {
- MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
- }
- if (metric instanceof View) {
- if (viewUpdater == null) {
- viewUpdater = new ViewUpdater(executor);
- }
- viewUpdater.notifyOfAddedView((View) metric);
- }
- } catch (Exception e) {
- LOG.error("Error while registering metric.", e);
}
}
@@ -273,31 +319,44 @@ public class MetricRegistry {
* @param group the group that contains the metric
*/
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
- try {
- if (reporters != null) {
- for (int i = 0; i < reporters.size(); i++) {
- MetricReporter reporter = reporters.get(i);
- if (reporter != null) {
- FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
- reporter.notifyOfRemovedMetric(metric, metricName, front);
+ synchronized (lock) {
+ if (isShutdown()) {
+ LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
+ } else {
+ try {
+ if (reporters != null) {
+ for (int i = 0; i < reporters.size(); i++) {
+ MetricReporter reporter = reporters.get(i);
+ if (reporter != null) {
+ FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+ reporter.notifyOfRemovedMetric(metric, metricName, front);
+ }
+ }
}
+ if (queryService != null) {
+ MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+ }
+ if (metric instanceof View) {
+ if (viewUpdater != null) {
+ viewUpdater.notifyOfRemovedView((View) metric);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error while registering metric.", e);
}
}
- if (queryService != null) {
- MetricQueryService.notifyOfRemovedMetric(queryService, metric);
- }
- if (metric instanceof View) {
- if (viewUpdater != null) {
- viewUpdater.notifyOfRemovedView((View) metric);
- }
- }
- } catch (Exception e) {
- LOG.error("Error while registering metric.", e);
}
}
// ------------------------------------------------------------------------
+ @VisibleForTesting
+ public ActorRef getQueryService() {
+ return queryService;
+ }
+
+ // ------------------------------------------------------------------------
+
/**
* This task is explicitly a static class, so that it does not hold any references to the enclosing
* MetricsRegistry instance.
http://git-wip-us.apache.org/repos/asf/flink/blob/8319a457/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index ab4e7a4..fe29ccb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.metrics;
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
@@ -26,6 +29,7 @@ import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.metrics.util.TestReporter;
@@ -33,11 +37,15 @@ import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class MetricRegistryTest extends TestLogger {
@@ -348,6 +356,32 @@ public class MetricRegistryTest extends TestLogger {
assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
}
+ /**
+ * Tests that the query actor will be stopped when the MetricRegistry is shut down.
+ */
+ @Test
+ public void testQueryActorShutdown() throws Exception {
+ final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
+
+ MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+ final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ registry.startQueryService(actorSystem, null);
+
+ ActorRef queryServiceActor = registry.getQueryService();
+
+ registry.shutdown();
+
+ try {
+ Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);
+
+ fail("The query actor should be terminated resulting in a ActorNotFound exception.");
+ } catch (ActorNotFound e) {
+ // we expect the query actor to be shut down
+ }
+ }
+
public static class TestReporter8 extends TestReporter {
char expectedDelimiter;
public static int numCorrectDelimitersForRegister = 0;