You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/03 17:16:09 UTC
[flink] branch release-1.6 updated: [FLINK-10761][metrics] Do not
acquire lock for getAllVariables()
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new cb85e62 [FLINK-10761][metrics] Do not acquire lock for getAllVariables()
cb85e62 is described below
commit cb85e62b5b122b7cbb9eeb72a116dd133467a920
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 3 18:13:48 2019 +0100
[FLINK-10761][metrics] Do not acquire lock for getAllVariables()
---
.../metrics/groups/AbstractMetricGroup.java | 16 ++--
.../metrics/groups/AbstractMetricGroupTest.java | 90 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 10 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 4400b14..fb32130 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -111,17 +111,13 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
}
public Map<String, String> getAllVariables() {
- if (variables == null) { // avoid synchronization for common case
- synchronized (this) {
- if (variables == null) {
- Map<String, String> tmpVariables = new HashMap<>();
- putVariables(tmpVariables);
- if (parent != null) { // not true for Job-/TaskManagerMetricGroup
- tmpVariables.putAll(parent.getAllVariables());
- }
- variables = tmpVariables;
- }
+ if (variables == null) {
+ Map<String, String> tmpVariables = new HashMap<>();
+ putVariables(tmpVariables);
+ if (parent != null) { // not true for Job-/TaskManagerMetricGroup
+ tmpVariables.putAll(parent.getAllVariables());
}
+ variables = tmpVariables;
}
return variables;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f3f8b42..d750f63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -21,17 +21,22 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -252,4 +257,89 @@ public class AbstractMetricGroupTest {
testRegistry.shutdown().get();
}
}
+
+ @Test
+ public void testGetAllVariablesDoesNotDeadlock() throws InterruptedException {
+ final TestMetricRegistry registry = new TestMetricRegistry();
+
+ final MetricGroup parent = new GenericMetricGroup(registry, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), "parent");
+ final MetricGroup child = parent.addGroup("child");
+
+ final Thread parentRegisteringThread = new Thread(() -> parent.counter("parent_counter"));
+ final Thread childRegisteringThread = new Thread(() -> child.counter("child_counter"));
+
+ final BlockerSync parentSync = new BlockerSync();
+ final BlockerSync childSync = new BlockerSync();
+
+ try {
+ // start both threads and have them block in the registry, so they acquire the lock of their respective group
+ registry.setOnRegistrationAction(childSync::blockNonInterruptible);
+ childRegisteringThread.start();
+ childSync.awaitBlocker();
+
+ registry.setOnRegistrationAction(parentSync::blockNonInterruptible);
+ parentRegisteringThread.start();
+ parentSync.awaitBlocker();
+
+ // the parent thread remains blocked to simulate the child thread holding some lock in the registry/reporter
+ // the child thread continues execution and calls getAllVariables()
+ // in the past this would block indefinitely since the method acquires the locks of all parent groups
+ childSync.releaseBlocker();
+ // wait with a timeout to ensure the finally block is executed _at some point_, un-blocking the parent
+ childRegisteringThread.join(1000 * 10);
+
+ parentSync.releaseBlocker();
+ parentRegisteringThread.join();
+ } finally {
+ parentSync.releaseBlocker();
+ childSync.releaseBlocker();
+ parentRegisteringThread.join();
+ childRegisteringThread.join();
+ }
+ }
+
+ private static final class TestMetricRegistry implements MetricRegistry {
+
+ private Runnable onRegistrationAction;
+
+ void setOnRegistrationAction(Runnable onRegistrationAction) {
+ this.onRegistrationAction = onRegistrationAction;
+ }
+
+ @Override
+ public char getDelimiter() {
+ return 0;
+ }
+
+ @Override
+ public char getDelimiter(int index) {
+ return 0;
+ }
+
+ @Override
+ public int getNumberReporters() {
+ return 0;
+ }
+
+ @Override
+ public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+ onRegistrationAction.run();
+ group.getAllVariables();
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+ }
+
+ @Override
+ public ScopeFormats getScopeFormats() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getMetricQueryServicePath() {
+ return null;
+ }
+ }
}