You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/07 14:42:19 UTC

[flink] branch master updated (6d7a15e -> 21468e0)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6d7a15e  [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
     new 0607fa5  [hotfix][runtime] Make TestingComponentMainThreadExecutor accept ComponentMainThreadExecutor
     new 5dc2a17  [hotfix][runtime] Fix main thread check in ComponentMainThreadExecutorServiceAdapter
     new 21468e0  [hotfix][runtime] Merge TestingComponentMainThreadExecutorServiceAdapter and ComponentMainThreadExecutorServiceAdapter

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ExecutionGraphCheckpointCoordinatorTest.java   |  4 +-
 .../ComponentMainThreadExecutorServiceAdapter.java | 41 ++++++++++----
 .../executiongraph/ArchivedExecutionGraphTest.java |  3 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |  4 +-
 .../ExecutionGraphCoLocationRestartTest.java       |  3 +-
 .../ExecutionGraphDeploymentTest.java              | 11 ++--
 .../executiongraph/ExecutionGraphMetricsTest.java  |  4 +-
 .../ExecutionGraphPartitionReleaseTest.java        |  3 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  6 +-
 .../ExecutionGraphSchedulingTest.java              | 15 ++---
 .../executiongraph/ExecutionGraphSuspendTest.java  |  5 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |  3 +-
 .../ExecutionGraphVariousFailuesTest.java          |  7 ++-
 .../runtime/executiongraph/ExecutionTest.java      | 11 ++--
 .../ExecutionVertexInputConstraintTest.java        |  4 +-
 .../runtime/executiongraph/FailoverRegionTest.java |  9 +--
 .../executiongraph/FinalizeOnMasterTest.java       |  5 +-
 .../executiongraph/GlobalModVersionTest.java       |  3 +-
 .../TestingComponentMainThreadExecutor.java        | 10 ++--
 ...gComponentMainThreadExecutorServiceAdapter.java | 64 ----------------------
 .../jobmanager/scheduler/SchedulerTestBase.java    |  4 +-
 .../jobmaster/slotpool/SlotPoolImplTest.java       |  4 +-
 .../jobmaster/slotpool/SlotPoolResource.java       |  4 +-
 23 files changed, 102 insertions(+), 125 deletions(-)
 rename flink-runtime/src/{main => test}/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java (70%)
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java


[flink] 01/03: [hotfix][runtime] Make TestingComponentMainThreadExecutor accept ComponentMainThreadExecutor

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0607fa50de42e38ec631e67ba3bbc9985b403aac
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jul 4 17:14:09 2019 +0200

    [hotfix][runtime] Make TestingComponentMainThreadExecutor accept ComponentMainThreadExecutor
---
 .../runtime/executiongraph/TestingComponentMainThreadExecutor.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
index 93131a3..56a39c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.function.FunctionUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -39,10 +40,10 @@ public class TestingComponentMainThreadExecutor {
 
 	/** The main thread executor to which execution is delegated. */
 	@Nonnull
-	private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor;
+	private final ComponentMainThreadExecutor mainThreadExecutor;
 
 	public TestingComponentMainThreadExecutor(
-		@Nonnull TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor) {
+			@Nonnull ComponentMainThreadExecutor mainThreadExecutor) {
 		this.mainThreadExecutor = mainThreadExecutor;
 	}
 
@@ -68,7 +69,7 @@ public class TestingComponentMainThreadExecutor {
 	}
 
 	@Nonnull
-	public TestingComponentMainThreadExecutorServiceAdapter getMainThreadExecutor() {
+	public ComponentMainThreadExecutor getMainThreadExecutor() {
 		return mainThreadExecutor;
 	}
 


[flink] 03/03: [hotfix][runtime] Merge TestingComponentMainThreadExecutorServiceAdapter and ComponentMainThreadExecutorServiceAdapter

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21468e0050dc5f97de5cfe39885e0d3fd648e399
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jul 4 18:07:22 2019 +0200

    [hotfix][runtime] Merge TestingComponentMainThreadExecutorServiceAdapter and ComponentMainThreadExecutorServiceAdapter
    
    Merge TestingComponentMainThreadExecutorServiceAdapter with
    ComponentMainThreadExecutorServiceAdapter because it only adds static methods.
    Move ComponentMainThreadExecutorServiceAdapter to test sources
    
    This closes #8989.
---
 .../ExecutionGraphCheckpointCoordinatorTest.java   |  4 +-
 .../ComponentMainThreadExecutorServiceAdapter.java | 41 ++++++++++----
 .../executiongraph/ArchivedExecutionGraphTest.java |  3 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |  4 +-
 .../ExecutionGraphCoLocationRestartTest.java       |  3 +-
 .../ExecutionGraphDeploymentTest.java              | 11 ++--
 .../executiongraph/ExecutionGraphMetricsTest.java  |  4 +-
 .../ExecutionGraphPartitionReleaseTest.java        |  3 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  6 +-
 .../ExecutionGraphSchedulingTest.java              | 15 ++---
 .../executiongraph/ExecutionGraphSuspendTest.java  |  5 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |  3 +-
 .../ExecutionGraphVariousFailuesTest.java          |  7 ++-
 .../runtime/executiongraph/ExecutionTest.java      | 11 ++--
 .../ExecutionVertexInputConstraintTest.java        |  4 +-
 .../runtime/executiongraph/FailoverRegionTest.java |  9 +--
 .../executiongraph/FinalizeOnMasterTest.java       |  5 +-
 .../executiongraph/GlobalModVersionTest.java       |  3 +-
 .../TestingComponentMainThreadExecutor.java        |  3 +-
 ...gComponentMainThreadExecutorServiceAdapter.java | 64 ----------------------
 .../jobmanager/scheduler/SchedulerTestBase.java    |  4 +-
 .../jobmaster/slotpool/SlotPoolImplTest.java       |  4 +-
 .../jobmaster/slotpool/SlotPoolResource.java       |  4 +-
 23 files changed, 97 insertions(+), 123 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 267b685..5cd1dba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -152,7 +152,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 			VoidBlobWriter.getInstance(),
 			timeout);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
 			100,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
similarity index 72%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index e3b0690..b255d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -19,14 +19,16 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import javax.annotation.Nonnull;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a
  * {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
@@ -41,23 +43,38 @@ public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainT
 
 	public ComponentMainThreadExecutorServiceAdapter(
 			final ScheduledExecutorService scheduledExecutorService,
-			final Runnable mainThreadCheck) {
-		this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
+			final Thread mainThread) {
+		this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThread);
 	}
 
 	public ComponentMainThreadExecutorServiceAdapter(
-			final ScheduledExecutor scheduledExecutorService,
+			final ScheduledExecutor scheduledExecutor,
 			final Thread mainThread) {
-		this(scheduledExecutorService, () -> {
+		this.scheduledExecutor = scheduledExecutor;
+		this.mainThreadCheck = () -> {
 			assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
-		});
+		};
 	}
 
-	private ComponentMainThreadExecutorServiceAdapter(
-			final ScheduledExecutor scheduledExecutor,
-			final Runnable mainThreadCheck) {
-		this.scheduledExecutor = checkNotNull(scheduledExecutor);
-		this.mainThreadCheck = checkNotNull(mainThreadCheck);
+	public static ComponentMainThreadExecutor forMainThread() {
+		final Thread main = Thread.currentThread();
+		return new ComponentMainThreadExecutorServiceAdapter(new DirectScheduledExecutorService() {
+			@Override
+			public void execute(Runnable command) {
+				assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
+				super.execute(command);
+			}
+		}, main);
+	}
+
+	/**
+	 * Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must
+	 * execute all submissions with the same thread.
+	 */
+	public static ComponentMainThreadExecutor forSingleThreadExecutor(
+			@Nonnull ScheduledExecutorService singleThreadExecutor) {
+		final Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join();
+		return new ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 0d481d6..72d8234 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -113,7 +114,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 			new NoRestartStrategy(),
 			mock(SlotProvider.class));
 
-		runtimeGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		runtimeGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		runtimeGraph.attachJobGraph(vertices);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 64c9ae4..3198dc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -86,7 +88,7 @@ import static org.mockito.Mockito.when;
  */
 public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger {
 
-	private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	/**
 	 * Tests that a cancellation concurrent to a local failover leads to a properly
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index d32e2d6..e76bc47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -74,7 +75,7 @@ public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
 
 		// enable the queued scheduling for the slot pool
 		eg.setQueuedSchedulingAllowed(true);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7ad1dbb..bf2b40d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -186,7 +187,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 				blobWriter,
 				AkkaUtils.getDefaultTimeout());
 
-			eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+			eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 			checkJobOffloaded(eg);
 
@@ -473,7 +474,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			blobWriter,
 			AkkaUtils.getDefaultTimeout());
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		checkJobOffloaded(eg);
 
@@ -556,7 +557,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout());
 		checkJobOffloaded(eg);
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		eg.setQueuedSchedulingAllowed(false);
 
@@ -636,7 +637,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			sourceVertex,
 			sinkVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.setScheduleMode(ScheduleMode.EAGER);
 		executionGraph.scheduleForExecution();
 
@@ -726,7 +727,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		executionGraph.setScheduleMode(ScheduleMode.EAGER);
 		executionGraph.setQueuedSchedulingAllowed(true);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		executionGraph.scheduleForExecution();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 878a736..72d3416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
@@ -49,7 +51,7 @@ import static org.junit.Assert.assertTrue;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
-	private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	/**
 	 * This test tests that the restarting time metric correctly displays restarting times.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 0110642..7d2369f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -60,7 +61,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger {
 	private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 	private static final TestingComponentMainThreadExecutor mainThreadExecutor =
 		new TestingComponentMainThreadExecutor(
-			TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
+			ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
 
 	@Test
 	public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index e78fa6f..70df983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -101,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
 
-	private static final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor =
-		TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private static final ComponentMainThreadExecutor mainThreadExecutor =
+		ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	private static final JobID TEST_JOB_ID = new JobID();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 9ceaf84..bdbcf94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -131,7 +132,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		//  set up two TaskManager gateways and slots
 
@@ -231,7 +232,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		//
 		//  kick off the scheduling
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
 		eg.scheduleForExecution();
@@ -318,7 +319,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		//
 		//  we complete some of the futures
@@ -398,7 +399,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotFutures[1].complete(slots[1]);
 
 		//  kick off the scheduling
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
 		eg.scheduleForExecution();
@@ -442,7 +443,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.scheduleForExecution();
 
 		final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
@@ -490,7 +491,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.scheduleForExecution();
 
 		assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
@@ -539,7 +540,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		final Set<SlotRequestId> slotRequestIdsToReturn = ConcurrentHashMap.newKeySet(slotRequestIds.size());
 
 		executionGraph.scheduleForExecution();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 2d5dd67..01e2146 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
@@ -221,7 +222,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 	@Test
 	public void testSuspendWhileRestarting() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -296,7 +297,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 			slotProvider,
 			new FixedDelayRestartStrategy(0, 0),
 			vertex);
-		simpleTestGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		simpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		return simpleTestGraph;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 8623eda..3143074 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -448,7 +449,7 @@ public class ExecutionGraphTestUtils {
 			.setFutureExecutor(executor)
 			.build();
 
-		graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		graph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		return new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout());
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
index e29902e..67c3e8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -41,7 +42,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
 	@Test
 	public void testFailureWhileRestarting() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(2));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -72,7 +73,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
 	@Test
 	public void testSuppressRestartFailureWhileRestarting() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -97,7 +98,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
 	@Test
 	public void testFailingScheduleOrUpdateConsumers() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 3a0770e..c2db133 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -119,7 +120,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -181,7 +182,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -283,7 +284,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -393,7 +394,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -596,7 +597,7 @@ public class ExecutionTest extends TestLogger {
 			NettyShuffleMaster.INSTANCE,
 			partitionTracker);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
 		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
index 15ff07d..9aaed7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -51,7 +53,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class ExecutionVertexInputConstraintTest extends TestLogger {
 
-	private TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	@Test
 	public void testInputConsumable() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 7575dcd..0a7f742 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -201,7 +202,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		enableCheckpointing(eg);
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -321,7 +322,7 @@ public class FailoverRegionTest extends TestLogger {
 			e.printStackTrace();
 			fail("Job failed with exception: " + e.getMessage());
 		}
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
 
@@ -482,7 +483,7 @@ public class FailoverRegionTest extends TestLogger {
 			slotProvider);
 
 		eg.attachJobGraph(ordered);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
 
@@ -582,7 +583,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		enableCheckpointing(eg);
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		attachPendingCheckpoints(eg);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
index df882ff..1f344fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -54,7 +55,7 @@ public class FinalizeOnMasterTest extends TestLogger {
 		vertex2.setParallelism(2);
 
 		final ExecutionGraph eg = createSimpleTestGraph(jid, vertex1, vertex2);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 		
@@ -79,7 +80,7 @@ public class FinalizeOnMasterTest extends TestLogger {
 		vertex.setParallelism(1);
 
 		final ExecutionGraph eg = createSimpleTestGraph(jid, vertex);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index e1b7da6..e47d489 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -171,7 +172,7 @@ public class GlobalModVersionTest extends TestLogger {
 			new CustomStrategy(failoverStrategy),
 			slotProvider);
 
-		graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		graph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
index 56a39c0..2771cb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.function.FunctionUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -95,7 +96,7 @@ public class TestingComponentMainThreadExecutor {
 			this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
 			this.componentMainThreadTestExecutor =
 				new TestingComponentMainThreadExecutor(
-					TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(innerExecutorService));
+					ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(innerExecutorService));
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java
deleted file mode 100644
index 7add060..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * An implementation of {@link org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests.
- */
-public class TestingComponentMainThreadExecutorServiceAdapter extends ComponentMainThreadExecutorServiceAdapter {
-
-	public TestingComponentMainThreadExecutorServiceAdapter(
-		@Nonnull ScheduledExecutorService scheduledExecutorService,
-		@Nonnull Thread mainThread) {
-
-		super(scheduledExecutorService, () -> {
-			assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
-		});
-	}
-
-	public static TestingComponentMainThreadExecutorServiceAdapter forMainThread() {
-		final Thread main = Thread.currentThread();
-		return new TestingComponentMainThreadExecutorServiceAdapter(new DirectScheduledExecutorService() {
-			@Override
-			public void execute(Runnable command) {
-				assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
-				super.execute(command);
-			}
-		}, main);
-	}
-
-	/**
-	 * Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must
-	 * execute all submissions with the same thread.
-	 */
-	public static TestingComponentMainThreadExecutorServiceAdapter forSingleThreadExecutor(
-		@Nonnull ScheduledExecutorService singleThreadExecutor) {
-		Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join();
-		return new TestingComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index fca13aa..873793f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,7 +81,7 @@ public class SchedulerTestBase extends TestLogger {
 
 		final JobMasterId jobMasterId = JobMasterId.generate();
 		final String jobManagerAddress = "localhost";
-		ComponentMainThreadExecutor executor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+		ComponentMainThreadExecutor executor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 		slotPool.start(jobMasterId, jobManagerAddress, executor);
 		testingScheduler.start(executor);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
index 1ff1bc3..fcbbbb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
@@ -100,7 +100,7 @@ public class SlotPoolImplTest extends TestLogger {
 	private TestingResourceManagerGateway resourceManagerGateway;
 
 	private ComponentMainThreadExecutor mainThreadExecutor =
-		TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+		ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	@Before
 	public void setUp() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
index ce8aedb..7762f3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 
@@ -46,7 +46,7 @@ public class SlotPoolResource extends ExternalResource {
 
 	public SlotPoolResource(@Nonnull SlotSelectionStrategy schedulingStrategy) {
 		this.schedulingStrategy = schedulingStrategy;
-		this.mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+		this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 		slotPool = null;
 		testingResourceManagerGateway = null;
 	}


[flink] 02/03: [hotfix][runtime] Fix main thread check in ComponentMainThreadExecutorServiceAdapter

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5dc2a17891cc75b4e4a522a6ba15f253452a3b8f
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jul 4 17:23:29 2019 +0200

    [hotfix][runtime] Fix main thread check in ComponentMainThreadExecutorServiceAdapter
---
 .../runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index c71675a..e3b0690 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -48,7 +48,9 @@ public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainT
 	public ComponentMainThreadExecutorServiceAdapter(
 			final ScheduledExecutor scheduledExecutorService,
 			final Thread mainThread) {
-		this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread));
+		this(scheduledExecutorService, () -> {
+			assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
+		});
 	}
 
 	private ComponentMainThreadExecutorServiceAdapter(