You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/06 14:49:22 UTC

[1/2] flink git commit: [FLINK-5179] [metrics] Close TaskManagerMetricGroup on JobManager dissociation

Repository: flink
Updated Branches:
  refs/heads/master 98d182603 -> f7f7b487b


[FLINK-5179] [metrics] Close TaskManagerMetricGroup on JobManager dissociation

This closes #2886.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5f4e3d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5f4e3d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5f4e3d8

Branch: refs/heads/master
Commit: e5f4e3d8aa330ebb88166a668b59b8d1730cb618
Parents: 98d1826
Author: zentol <ch...@apache.org>
Authored: Mon Nov 28 16:10:33 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 6 14:03:08 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/metrics/MetricRegistry.java   |   9 ++
 .../flink/runtime/taskmanager/TaskManager.scala |   5 +-
 .../runtime/metrics/MetricRegistryTest.java     |  11 ++
 .../runtime/metrics/TaskManagerMetricsTest.java | 133 +++++++++++++++++++
 4 files changed, 155 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index c17fdb4..f4510db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -185,6 +185,15 @@ public class MetricRegistry {
 	}
 
 	/**
+	 * Returns whether this registry has been shutdown.
+	 *
+	 * @return true, if this registry was shutdown, otherwise false
+	 */
+	public boolean isShutdown() {
+		return reporters == null && executor.isShutdown();
+	}
+
+	/**
 	 * Shuts down this registry and the associated {@link MetricReporter}.
 	 */
 	public void shutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a3b1382..271578f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1046,11 +1046,10 @@ class TaskManager(
       network.getKvStateRegistry.unregisterListener()
     }
     
-    // failsafe shutdown of the metrics registry
     try {
-      metricsRegistry.shutdown()
+      taskManagerMetricGroup.close()
     } catch {
-      case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
+      case t: Exception => log.warn("TaskManagerMetricGroup could not be closed successfully.", t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 1157215..ab4e7a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -42,6 +42,17 @@ import static org.junit.Assert.assertTrue;
 public class MetricRegistryTest extends TestLogger {
 
 	private static final char GLOBAL_DEFAULT_DELIMITER = '.';
+
+	@Test
+	public void testIsShutdown() {
+		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		
+		Assert.assertFalse(metricRegistry.isShutdown());
+		
+		metricRegistry.shutdown();
+		
+		Assert.assertTrue(metricRegistry.isShutdown());
+	}
 	
 	/**
 	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.

http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
new file mode 100644
index 0000000..ad3de33
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.Option;
+import scala.Tuple7;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class TaskManagerMetricsTest {
+
+	/**
+	 * Tests the metric registry life cycle on JobManager re-connects.
+	 */
+	@Test
+	public void testMetricRegistryLifeCycle() {
+		ActorSystem actorSystem = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			// ================================================================
+			// Start JobManager
+			// ================================================================
+			final ActorRef jobManager = JobManager.startJobManagerActors(
+				new Configuration(),
+				actorSystem,
+				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
+				JobManager.class,
+				MemoryArchivist.class)._1();
+
+			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
+
+			// ================================================================
+			// Start TaskManager
+			// ================================================================
+			ResourceID tmResourceID = ResourceID.generate();
+
+			Tuple7<TaskManagerConfiguration, TaskManagerLocation, MemoryManager, IOManager, NetworkEnvironment, LeaderRetrievalService, MetricRegistry> components =
+				TaskManager.createTaskManagerComponents(
+					new Configuration(),
+					tmResourceID,
+					"localhost",
+					true,
+					Option.apply(leaderRetrievalService)
+				);
+
+			// create the task manager
+			final Props tmProps = TaskManager.getTaskManagerProps(
+				TaskManager.class,
+				components._1(),
+				tmResourceID,
+				components._2(),
+				components._3(),
+				components._4(),
+				components._5(),
+				components._6(),
+				components._7());
+
+			final ActorRef taskManager = actorSystem.actorOf(tmProps);
+
+			new JavaTestKit(actorSystem) {{
+				new Within(new FiniteDuration(5000, TimeUnit.SECONDS)) {
+					@Override
+					protected void run() {
+						taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+							getTestActor());
+
+						// wait for the TM to be registered
+						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+
+						// trigger re-registration of TM; this should include a disconnect from the current JM
+						taskManager.tell(new TaskManagerMessages.JobManagerLeaderAddress(jobManager.path().toString(), null), jobManager);
+
+						// wait for re-registration to be completed
+						taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+							getTestActor());
+
+						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+					}
+				};
+			}};
+
+			// verify that the registry was not shutdown due to the disconnect
+			MetricRegistry tmRegistry = components._7();
+			Assert.assertFalse(tmRegistry.isShutdown());
+
+			// shut down the actors and the actor system
+			actorSystem.shutdown();
+			actorSystem.awaitTermination();
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+		}
+	}
+}


[2/2] flink git commit: [FLINK-5261] [metrics] Clean up meters in ScheduledDropwizardReporter

Posted by uc...@apache.org.
[FLINK-5261] [metrics] Clean up meters in ScheduledDropwizardReporter

This closes #2944.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7f7b487
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7f7b487
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7f7b487

Branch: refs/heads/master
Commit: f7f7b487b705e31062e5f00508a92bed5b185674
Parents: e5f4e3d
Author: zentol <ch...@apache.org>
Authored: Mon Dec 5 14:00:10 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 6 14:06:51 2016 +0100

----------------------------------------------------------------------
 .../dropwizard/ScheduledDropwizardReporter.java | 19 +++-
 .../ScheduledDropwizardReporterTest.java        | 92 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7f7b487/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index b7e83b6..380abc4 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -21,11 +21,11 @@ package org.apache.flink.dropwizard;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Reporter;
 import com.codahale.metrics.ScheduledReporter;
-
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
-import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkMeterWrapper;
@@ -82,15 +82,26 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
 	//  Getters
 	// ------------------------------------------------------------------------
 
-	// used for testing purposes
+	@VisibleForTesting
 	Map<Counter, String> getCounters() {
 		return counters;
 	}
 
+	@VisibleForTesting
 	Map<Meter, String> getMeters() {
 		return meters;
 	}
 
+	@VisibleForTesting
+	Map<Gauge<?>, String> getGauges() {
+		return gauges;
+	}
+
+	@VisibleForTesting
+	Map<Histogram, String> getHistograms() {
+		return histograms;
+	}
+
 	// ------------------------------------------------------------------------
 	//  life cycle
 	// ------------------------------------------------------------------------
@@ -157,6 +168,8 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
 				fullName = gauges.remove(metric);
 			} else if (metric instanceof Histogram) {
 				fullName = histograms.remove(metric);
+			} else if (metric instanceof Meter) {
+				fullName = meters.remove(metric);
 			} else {
 				fullName = null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7f7b487/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 9385510..87c4ccb 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -24,9 +24,14 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -124,6 +129,93 @@ public class ScheduledDropwizardReporterTest {
 		metricRegistry.shutdown();
 	}
 
+	/**
+	 * This test verifies that metrics are properly added and removed to/from the ScheduledDropwizardReporter and
+	 * the underlying Dropwizard MetricRegistry.
+	 */
+	@Test
+	public void testMetricCleanup() {
+		TestingScheduledDropwizardReporter rep = new TestingScheduledDropwizardReporter();
+
+		MetricGroup mp = new UnregisteredMetricsGroup();
+
+		Counter c = new SimpleCounter();
+		Meter m = new Meter() {
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 0;
+			}
+
+			@Override
+			public long getCount() {
+				return 0;
+			}
+		};
+		Histogram h = new Histogram() {
+			@Override
+			public void update(long value) {
+
+			}
+
+			@Override
+			public long getCount() {
+				return 0;
+			}
+
+			@Override
+			public HistogramStatistics getStatistics() {
+				return null;
+			}
+		};
+		Gauge g = new Gauge() {
+			@Override
+			public Object getValue() {
+				return null;
+			}
+		};
+
+		rep.notifyOfAddedMetric(c, "counter", mp);
+		assertEquals(1, rep.getCounters().size());
+		assertEquals(1, rep.registry.getCounters().size());
+
+		rep.notifyOfAddedMetric(m, "meter", mp);
+		assertEquals(1, rep.getMeters().size());
+		assertEquals(1, rep.registry.getMeters().size());
+
+		rep.notifyOfAddedMetric(h, "histogram", mp);
+		assertEquals(1, rep.getHistograms().size());
+		assertEquals(1, rep.registry.getHistograms().size());
+
+		rep.notifyOfAddedMetric(g, "gauge", mp);
+		assertEquals(1, rep.getGauges().size());
+		assertEquals(1, rep.registry.getGauges().size());
+
+
+		rep.notifyOfRemovedMetric(c, "counter", mp);
+		assertEquals(0, rep.getCounters().size());
+		assertEquals(0, rep.registry.getCounters().size());
+
+		rep.notifyOfRemovedMetric(m, "meter", mp);
+		assertEquals(0, rep.getMeters().size());
+		assertEquals(0, rep.registry.getMeters().size());
+
+		rep.notifyOfRemovedMetric(h, "histogram", mp);
+		assertEquals(0, rep.getHistograms().size());
+		assertEquals(0, rep.registry.getHistograms().size());
+
+		rep.notifyOfRemovedMetric(g, "gauge", mp);
+		assertEquals(0, rep.getGauges().size());
+		assertEquals(0, rep.registry.getGauges().size());
+	}
+
 	public static class TestingScheduledDropwizardReporter extends ScheduledDropwizardReporter {
 
 		@Override