You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/03 17:16:09 UTC

[flink] branch release-1.6 updated: [FLINK-10761][metrics] Do not acquire lock for getAllVariables()

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new cb85e62  [FLINK-10761][metrics] Do not acquire lock for getAllVariables()
cb85e62 is described below

commit cb85e62b5b122b7cbb9eeb72a116dd133467a920
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 3 18:13:48 2019 +0100

    [FLINK-10761][metrics] Do not acquire lock for getAllVariables()
---
 .../metrics/groups/AbstractMetricGroup.java        | 16 ++--
 .../metrics/groups/AbstractMetricGroupTest.java    | 90 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 4400b14..fb32130 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -111,17 +111,13 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	}
 
 	public Map<String, String> getAllVariables() {
-		if (variables == null) { // avoid synchronization for common case
-			synchronized (this) {
-				if (variables == null) {
-					Map<String, String> tmpVariables = new HashMap<>();
-					putVariables(tmpVariables);
-					if (parent != null) { // not true for Job-/TaskManagerMetricGroup
-						tmpVariables.putAll(parent.getAllVariables());
-					}
-					variables = tmpVariables;
-				}
+		if (variables == null) {
+			Map<String, String> tmpVariables = new HashMap<>();
+			putVariables(tmpVariables);
+			if (parent != null) { // not true for Job-/TaskManagerMetricGroup
+				tmpVariables.putAll(parent.getAllVariables());
 			}
+			variables = tmpVariables;
 		}
 		return variables;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f3f8b42..d750f63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -21,17 +21,22 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -252,4 +257,89 @@ public class AbstractMetricGroupTest {
 			testRegistry.shutdown().get();
 		}
 	}
+
+	@Test
+	public void testGetAllVariablesDoesNotDeadlock() throws InterruptedException {
+		final TestMetricRegistry registry = new TestMetricRegistry();
+
+		final MetricGroup parent = new GenericMetricGroup(registry, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), "parent");
+		final MetricGroup child = parent.addGroup("child");
+
+		final Thread parentRegisteringThread = new Thread(() -> parent.counter("parent_counter"));
+		final Thread childRegisteringThread = new Thread(() -> child.counter("child_counter"));
+
+		final BlockerSync parentSync = new BlockerSync();
+		final BlockerSync childSync = new BlockerSync();
+
+		try {
+			// start both threads and have them block in the registry, so they acquire the lock of their respective group
+			registry.setOnRegistrationAction(childSync::blockNonInterruptible);
+			childRegisteringThread.start();
+			childSync.awaitBlocker();
+
+			registry.setOnRegistrationAction(parentSync::blockNonInterruptible);
+			parentRegisteringThread.start();
+			parentSync.awaitBlocker();
+
+			// the parent thread remains blocked to simulate the child thread holding some lock in the registry/reporter
+			// the child thread continues execution and calls getAllVariables()
+			// in the past this would block indefinitely since the method acquires the locks of all parent groups
+			childSync.releaseBlocker();
+			// wait with a timeout to ensure the finally block is executed _at some point_, un-blocking the parent
+			childRegisteringThread.join(1000 * 10);
+
+			parentSync.releaseBlocker();
+			parentRegisteringThread.join();
+		} finally {
+			parentSync.releaseBlocker();
+			childSync.releaseBlocker();
+			parentRegisteringThread.join();
+			childRegisteringThread.join();
+		}
+	}
+
+	private static final class TestMetricRegistry implements MetricRegistry {
+
+		private Runnable onRegistrationAction;
+
+		void setOnRegistrationAction(Runnable onRegistrationAction) {
+			this.onRegistrationAction = onRegistrationAction;
+		}
+
+		@Override
+		public char getDelimiter() {
+			return 0;
+		}
+
+		@Override
+		public char getDelimiter(int index) {
+			return 0;
+		}
+
+		@Override
+		public int getNumberReporters() {
+			return 0;
+		}
+
+		@Override
+		public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+			onRegistrationAction.run();
+			group.getAllVariables();
+		}
+
+		@Override
+		public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+		}
+
+		@Override
+		public ScopeFormats getScopeFormats() {
+			return null;
+		}
+
+		@Nullable
+		@Override
+		public String getMetricQueryServicePath() {
+			return null;
+		}
+	}
 }