You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/06 14:49:22 UTC
[1/2] flink git commit: [FLINK-5179] [metrics] Close
TaskManagerMetricGroup on JobManager dissociation
Repository: flink
Updated Branches:
refs/heads/master 98d182603 -> f7f7b487b
[FLINK-5179] [metrics] Close TaskManagerMetricGroup on JobManager dissociation
This closes #2886.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5f4e3d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5f4e3d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5f4e3d8
Branch: refs/heads/master
Commit: e5f4e3d8aa330ebb88166a668b59b8d1730cb618
Parents: 98d1826
Author: zentol <ch...@apache.org>
Authored: Mon Nov 28 16:10:33 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 6 14:03:08 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/metrics/MetricRegistry.java | 9 ++
.../flink/runtime/taskmanager/TaskManager.scala | 5 +-
.../runtime/metrics/MetricRegistryTest.java | 11 ++
.../runtime/metrics/TaskManagerMetricsTest.java | 133 +++++++++++++++++++
4 files changed, 155 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index c17fdb4..f4510db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -185,6 +185,15 @@ public class MetricRegistry {
}
/**
+ * Returns whether this registry has been shutdown.
+ *
+ * @return true, if this registry was shutdown, otherwise false
+ */
+ public boolean isShutdown() {
+ return reporters == null && executor.isShutdown();
+ }
+
+ /**
* Shuts down this registry and the associated {@link MetricReporter}.
*/
public void shutdown() {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a3b1382..271578f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1046,11 +1046,10 @@ class TaskManager(
network.getKvStateRegistry.unregisterListener()
}
- // failsafe shutdown of the metrics registry
try {
- metricsRegistry.shutdown()
+ taskManagerMetricGroup.close()
} catch {
- case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
+ case t: Exception => log.warn("TaskManagerMetricGroup could not be closed successfully.", t)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 1157215..ab4e7a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -42,6 +42,17 @@ import static org.junit.Assert.assertTrue;
public class MetricRegistryTest extends TestLogger {
private static final char GLOBAL_DEFAULT_DELIMITER = '.';
+
+ @Test
+ public void testIsShutdown() {
+ MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+ Assert.assertFalse(metricRegistry.isShutdown());
+
+ metricRegistry.shutdown();
+
+ Assert.assertTrue(metricRegistry.isShutdown());
+ }
/**
* Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
new file mode 100644
index 0000000..ad3de33
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.Option;
+import scala.Tuple7;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class TaskManagerMetricsTest {
+
+ /**
+ * Tests the metric registry life cycle on JobManager re-connects.
+ */
+ @Test
+ public void testMetricRegistryLifeCycle() {
+ ActorSystem actorSystem = null;
+ try {
+ actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ // ================================================================
+ // Start JobManager
+ // ================================================================
+ final ActorRef jobManager = JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ actorSystem.dispatcher(),
+ actorSystem.dispatcher(),
+ JobManager.class,
+ MemoryArchivist.class)._1();
+
+ LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
+
+ // ================================================================
+ // Start TaskManager
+ // ================================================================
+ ResourceID tmResourceID = ResourceID.generate();
+
+ Tuple7<TaskManagerConfiguration, TaskManagerLocation, MemoryManager, IOManager, NetworkEnvironment, LeaderRetrievalService, MetricRegistry> components =
+ TaskManager.createTaskManagerComponents(
+ new Configuration(),
+ tmResourceID,
+ "localhost",
+ true,
+ Option.apply(leaderRetrievalService)
+ );
+
+ // create the task manager
+ final Props tmProps = TaskManager.getTaskManagerProps(
+ TaskManager.class,
+ components._1(),
+ tmResourceID,
+ components._2(),
+ components._3(),
+ components._4(),
+ components._5(),
+ components._6(),
+ components._7());
+
+ final ActorRef taskManager = actorSystem.actorOf(tmProps);
+
+ new JavaTestKit(actorSystem) {{
+ new Within(new FiniteDuration(5000, TimeUnit.SECONDS)) {
+ @Override
+ protected void run() {
+ taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ getTestActor());
+
+ // wait for the TM to be registered
+ expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+
+ // trigger re-registration of TM; this should include a disconnect from the current JM
+ taskManager.tell(new TaskManagerMessages.JobManagerLeaderAddress(jobManager.path().toString(), null), jobManager);
+
+ // wait for re-registration to be completed
+ taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ getTestActor());
+
+ expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+ }
+ };
+ }};
+
+ // verify that the registry was not shutdown due to the disconnect
+ MetricRegistry tmRegistry = components._7();
+ Assert.assertFalse(tmRegistry.isShutdown());
+
+ // shut down the actors and the actor system
+ actorSystem.shutdown();
+ actorSystem.awaitTermination();
+ } finally {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+ }
+ }
+}
[2/2] flink git commit: [FLINK-5261] [metrics] Clean up meters in
ScheduledDropwizardReporter
Posted by uc...@apache.org.
[FLINK-5261] [metrics] Clean up meters in ScheduledDropwizardReporter
This closes #2944.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7f7b487
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7f7b487
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7f7b487
Branch: refs/heads/master
Commit: f7f7b487b705e31062e5f00508a92bed5b185674
Parents: e5f4e3d
Author: zentol <ch...@apache.org>
Authored: Mon Dec 5 14:00:10 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 6 14:06:51 2016 +0100
----------------------------------------------------------------------
.../dropwizard/ScheduledDropwizardReporter.java | 19 +++-
.../ScheduledDropwizardReporterTest.java | 92 ++++++++++++++++++++
2 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f7f7b487/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index b7e83b6..380abc4 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -21,11 +21,11 @@ package org.apache.flink.dropwizard;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Reporter;
import com.codahale.metrics.ScheduledReporter;
-
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
-import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
import org.apache.flink.dropwizard.metrics.FlinkMeterWrapper;
@@ -82,15 +82,26 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
// Getters
// ------------------------------------------------------------------------
- // used for testing purposes
+ @VisibleForTesting
Map<Counter, String> getCounters() {
return counters;
}
+ @VisibleForTesting
Map<Meter, String> getMeters() {
return meters;
}
+ @VisibleForTesting
+ Map<Gauge<?>, String> getGauges() {
+ return gauges;
+ }
+
+ @VisibleForTesting
+ Map<Histogram, String> getHistograms() {
+ return histograms;
+ }
+
// ------------------------------------------------------------------------
// life cycle
// ------------------------------------------------------------------------
@@ -157,6 +168,8 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
fullName = gauges.remove(metric);
} else if (metric instanceof Histogram) {
fullName = histograms.remove(metric);
+ } else if (metric instanceof Meter) {
+ fullName = meters.remove(metric);
} else {
fullName = null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7f7b487/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 9385510..87c4ccb 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -24,9 +24,14 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -124,6 +129,93 @@ public class ScheduledDropwizardReporterTest {
metricRegistry.shutdown();
}
+ /**
+ * This test verifies that metrics are properly added and removed to/from the ScheduledDropwizardReporter and
+ * the underlying Dropwizard MetricRegistry.
+ */
+ @Test
+ public void testMetricCleanup() {
+ TestingScheduledDropwizardReporter rep = new TestingScheduledDropwizardReporter();
+
+ MetricGroup mp = new UnregisteredMetricsGroup();
+
+ Counter c = new SimpleCounter();
+ Meter m = new Meter() {
+ @Override
+ public void markEvent() {
+ }
+
+ @Override
+ public void markEvent(long n) {
+ }
+
+ @Override
+ public double getRate() {
+ return 0;
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+ };
+ Histogram h = new Histogram() {
+ @Override
+ public void update(long value) {
+
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return null;
+ }
+ };
+ Gauge g = new Gauge() {
+ @Override
+ public Object getValue() {
+ return null;
+ }
+ };
+
+ rep.notifyOfAddedMetric(c, "counter", mp);
+ assertEquals(1, rep.getCounters().size());
+ assertEquals(1, rep.registry.getCounters().size());
+
+ rep.notifyOfAddedMetric(m, "meter", mp);
+ assertEquals(1, rep.getMeters().size());
+ assertEquals(1, rep.registry.getMeters().size());
+
+ rep.notifyOfAddedMetric(h, "histogram", mp);
+ assertEquals(1, rep.getHistograms().size());
+ assertEquals(1, rep.registry.getHistograms().size());
+
+ rep.notifyOfAddedMetric(g, "gauge", mp);
+ assertEquals(1, rep.getGauges().size());
+ assertEquals(1, rep.registry.getGauges().size());
+
+
+ rep.notifyOfRemovedMetric(c, "counter", mp);
+ assertEquals(0, rep.getCounters().size());
+ assertEquals(0, rep.registry.getCounters().size());
+
+ rep.notifyOfRemovedMetric(m, "meter", mp);
+ assertEquals(0, rep.getMeters().size());
+ assertEquals(0, rep.registry.getMeters().size());
+
+ rep.notifyOfRemovedMetric(h, "histogram", mp);
+ assertEquals(0, rep.getHistograms().size());
+ assertEquals(0, rep.registry.getHistograms().size());
+
+ rep.notifyOfRemovedMetric(g, "gauge", mp);
+ assertEquals(0, rep.getGauges().size());
+ assertEquals(0, rep.registry.getGauges().size());
+ }
+
public static class TestingScheduledDropwizardReporter extends ScheduledDropwizardReporter {
@Override