You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/07 14:42:22 UTC

[flink] 03/03: [hotfix][runtime] Merge TestingComponentMainThreadExecutorServiceAdapter and ComponentMainThreadExecutorServiceAdapter

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21468e0050dc5f97de5cfe39885e0d3fd648e399
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jul 4 18:07:22 2019 +0200

    [hotfix][runtime] Merge TestingComponentMainThreadExecutorServiceAdapter and ComponentMainThreadExecutorServiceAdapter
    
    Merge TestingComponentMainThreadExecutorServiceAdapter with
    ComponentMainThreadExecutorServiceAdapter because it only adds static methods.
    Move ComponentMainThreadExecutorServiceAdapter to test sources
    
    This closes #8989.
---
 .../ExecutionGraphCheckpointCoordinatorTest.java   |  4 +-
 .../ComponentMainThreadExecutorServiceAdapter.java | 41 ++++++++++----
 .../executiongraph/ArchivedExecutionGraphTest.java |  3 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |  4 +-
 .../ExecutionGraphCoLocationRestartTest.java       |  3 +-
 .../ExecutionGraphDeploymentTest.java              | 11 ++--
 .../executiongraph/ExecutionGraphMetricsTest.java  |  4 +-
 .../ExecutionGraphPartitionReleaseTest.java        |  3 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  6 +-
 .../ExecutionGraphSchedulingTest.java              | 15 ++---
 .../executiongraph/ExecutionGraphSuspendTest.java  |  5 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |  3 +-
 .../ExecutionGraphVariousFailuesTest.java          |  7 ++-
 .../runtime/executiongraph/ExecutionTest.java      | 11 ++--
 .../ExecutionVertexInputConstraintTest.java        |  4 +-
 .../runtime/executiongraph/FailoverRegionTest.java |  9 +--
 .../executiongraph/FinalizeOnMasterTest.java       |  5 +-
 .../executiongraph/GlobalModVersionTest.java       |  3 +-
 .../TestingComponentMainThreadExecutor.java        |  3 +-
 ...gComponentMainThreadExecutorServiceAdapter.java | 64 ----------------------
 .../jobmanager/scheduler/SchedulerTestBase.java    |  4 +-
 .../jobmaster/slotpool/SlotPoolImplTest.java       |  4 +-
 .../jobmaster/slotpool/SlotPoolResource.java       |  4 +-
 23 files changed, 97 insertions(+), 123 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 267b685..5cd1dba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -152,7 +152,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 			VoidBlobWriter.getInstance(),
 			timeout);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
 			100,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
similarity index 72%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index e3b0690..b255d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -19,14 +19,16 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import javax.annotation.Nonnull;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a
  * {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
@@ -41,23 +43,38 @@ public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainT
 
 	public ComponentMainThreadExecutorServiceAdapter(
 			final ScheduledExecutorService scheduledExecutorService,
-			final Runnable mainThreadCheck) {
-		this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
+			final Thread mainThread) {
+		this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThread);
 	}
 
 	public ComponentMainThreadExecutorServiceAdapter(
-			final ScheduledExecutor scheduledExecutorService,
+			final ScheduledExecutor scheduledExecutor,
 			final Thread mainThread) {
-		this(scheduledExecutorService, () -> {
+		this.scheduledExecutor = scheduledExecutor;
+		this.mainThreadCheck = () -> {
 			assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
-		});
+		};
 	}
 
-	private ComponentMainThreadExecutorServiceAdapter(
-			final ScheduledExecutor scheduledExecutor,
-			final Runnable mainThreadCheck) {
-		this.scheduledExecutor = checkNotNull(scheduledExecutor);
-		this.mainThreadCheck = checkNotNull(mainThreadCheck);
+	public static ComponentMainThreadExecutor forMainThread() {
+		final Thread main = Thread.currentThread();
+		return new ComponentMainThreadExecutorServiceAdapter(new DirectScheduledExecutorService() {
+			@Override
+			public void execute(Runnable command) {
+				assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
+				super.execute(command);
+			}
+		}, main);
+	}
+
+	/**
+	 * Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must
+	 * execute all submissions with the same thread.
+	 */
+	public static ComponentMainThreadExecutor forSingleThreadExecutor(
+			@Nonnull ScheduledExecutorService singleThreadExecutor) {
+		final Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join();
+		return new ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 0d481d6..72d8234 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -113,7 +114,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 			new NoRestartStrategy(),
 			mock(SlotProvider.class));
 
-		runtimeGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		runtimeGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		runtimeGraph.attachJobGraph(vertices);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 64c9ae4..3198dc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -86,7 +88,7 @@ import static org.mockito.Mockito.when;
  */
 public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger {
 
-	private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	/**
 	 * Tests that a cancellation concurrent to a local failover leads to a properly
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index d32e2d6..e76bc47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -74,7 +75,7 @@ public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
 
 		// enable the queued scheduling for the slot pool
 		eg.setQueuedSchedulingAllowed(true);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7ad1dbb..bf2b40d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -186,7 +187,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 				blobWriter,
 				AkkaUtils.getDefaultTimeout());
 
-			eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+			eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 			checkJobOffloaded(eg);
 
@@ -473,7 +474,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			blobWriter,
 			AkkaUtils.getDefaultTimeout());
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		checkJobOffloaded(eg);
 
@@ -556,7 +557,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout());
 		checkJobOffloaded(eg);
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		eg.setQueuedSchedulingAllowed(false);
 
@@ -636,7 +637,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			sourceVertex,
 			sinkVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.setScheduleMode(ScheduleMode.EAGER);
 		executionGraph.scheduleForExecution();
 
@@ -726,7 +727,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		executionGraph.setScheduleMode(ScheduleMode.EAGER);
 		executionGraph.setQueuedSchedulingAllowed(true);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		executionGraph.scheduleForExecution();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 878a736..72d3416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
@@ -49,7 +51,7 @@ import static org.junit.Assert.assertTrue;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
-	private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	/**
 	 * This test tests that the restarting time metric correctly displays restarting times.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 0110642..7d2369f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -60,7 +61,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger {
 	private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 	private static final TestingComponentMainThreadExecutor mainThreadExecutor =
 		new TestingComponentMainThreadExecutor(
-			TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
+			ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
 
 	@Test
 	public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index e78fa6f..70df983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -101,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
 
-	private static final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor =
-		TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private static final ComponentMainThreadExecutor mainThreadExecutor =
+		ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	private static final JobID TEST_JOB_ID = new JobID();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 9ceaf84..bdbcf94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -131,7 +132,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		//  set up two TaskManager gateways and slots
 
@@ -231,7 +232,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		//
 		//  kick off the scheduling
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
 		eg.scheduleForExecution();
@@ -318,7 +319,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		//
 		//  we complete some of the futures
@@ -398,7 +399,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotFutures[1].complete(slots[1]);
 
 		//  kick off the scheduling
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
 		eg.scheduleForExecution();
@@ -442,7 +443,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.scheduleForExecution();
 
 		final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
@@ -490,7 +491,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.scheduleForExecution();
 
 		assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
@@ -539,7 +540,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		final Set<SlotRequestId> slotRequestIdsToReturn = ConcurrentHashMap.newKeySet(slotRequestIds.size());
 
 		executionGraph.scheduleForExecution();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 2d5dd67..01e2146 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
@@ -221,7 +222,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 	@Test
 	public void testSuspendWhileRestarting() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -296,7 +297,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 			slotProvider,
 			new FixedDelayRestartStrategy(0, 0),
 			vertex);
-		simpleTestGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		simpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		return simpleTestGraph;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 8623eda..3143074 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -448,7 +449,7 @@ public class ExecutionGraphTestUtils {
 			.setFutureExecutor(executor)
 			.build();
 
-		graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		graph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		return new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout());
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
index e29902e..67c3e8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -41,7 +42,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
 	@Test
 	public void testFailureWhileRestarting() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(2));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -72,7 +73,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
 	@Test
 	public void testSuppressRestartFailureWhileRestarting() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -97,7 +98,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
 	@Test
 	public void testFailingScheduleOrUpdateConsumers() throws Exception {
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 3a0770e..c2db133 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -119,7 +120,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -181,7 +182,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -283,7 +284,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -393,7 +394,7 @@ public class ExecutionTest extends TestLogger {
 			new NoRestartStrategy(),
 			jobVertex);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
 
@@ -596,7 +597,7 @@ public class ExecutionTest extends TestLogger {
 			NettyShuffleMaster.INSTANCE,
 			partitionTracker);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
 		final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
index 15ff07d..9aaed7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -51,7 +53,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class ExecutionVertexInputConstraintTest extends TestLogger {
 
-	private TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	@Test
 	public void testInputConsumable() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 7575dcd..0a7f742 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -201,7 +202,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		enableCheckpointing(eg);
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -321,7 +322,7 @@ public class FailoverRegionTest extends TestLogger {
 			e.printStackTrace();
 			fail("Job failed with exception: " + e.getMessage());
 		}
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
 
@@ -482,7 +483,7 @@ public class FailoverRegionTest extends TestLogger {
 			slotProvider);
 
 		eg.attachJobGraph(ordered);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
 
@@ -582,7 +583,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		enableCheckpointing(eg);
 
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 
 		attachPendingCheckpoints(eg);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
index df882ff..1f344fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -54,7 +55,7 @@ public class FinalizeOnMasterTest extends TestLogger {
 		vertex2.setParallelism(2);
 
 		final ExecutionGraph eg = createSimpleTestGraph(jid, vertex1, vertex2);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 		
@@ -79,7 +80,7 @@ public class FinalizeOnMasterTest extends TestLogger {
 		vertex.setParallelism(1);
 
 		final ExecutionGraph eg = createSimpleTestGraph(jid, vertex);
-		eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index e1b7da6..e47d489 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -171,7 +172,7 @@ public class GlobalModVersionTest extends TestLogger {
 			new CustomStrategy(failoverStrategy),
 			slotProvider);
 
-		graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		graph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
index 56a39c0..2771cb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.function.FunctionUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -95,7 +96,7 @@ public class TestingComponentMainThreadExecutor {
 			this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
 			this.componentMainThreadTestExecutor =
 				new TestingComponentMainThreadExecutor(
-					TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(innerExecutorService));
+					ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(innerExecutorService));
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java
deleted file mode 100644
index 7add060..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * An implementation of {@link org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests.
- */
-public class TestingComponentMainThreadExecutorServiceAdapter extends ComponentMainThreadExecutorServiceAdapter {
-
-	public TestingComponentMainThreadExecutorServiceAdapter(
-		@Nonnull ScheduledExecutorService scheduledExecutorService,
-		@Nonnull Thread mainThread) {
-
-		super(scheduledExecutorService, () -> {
-			assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
-		});
-	}
-
-	public static TestingComponentMainThreadExecutorServiceAdapter forMainThread() {
-		final Thread main = Thread.currentThread();
-		return new TestingComponentMainThreadExecutorServiceAdapter(new DirectScheduledExecutorService() {
-			@Override
-			public void execute(Runnable command) {
-				assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
-				super.execute(command);
-			}
-		}, main);
-	}
-
-	/**
-	 * Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must
-	 * execute all submissions with the same thread.
-	 */
-	public static TestingComponentMainThreadExecutorServiceAdapter forSingleThreadExecutor(
-		@Nonnull ScheduledExecutorService singleThreadExecutor) {
-		Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join();
-		return new TestingComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index fca13aa..873793f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,7 +81,7 @@ public class SchedulerTestBase extends TestLogger {
 
 		final JobMasterId jobMasterId = JobMasterId.generate();
 		final String jobManagerAddress = "localhost";
-		ComponentMainThreadExecutor executor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+		ComponentMainThreadExecutor executor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 		slotPool.start(jobMasterId, jobManagerAddress, executor);
 		testingScheduler.start(executor);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
index 1ff1bc3..fcbbbb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
@@ -100,7 +100,7 @@ public class SlotPoolImplTest extends TestLogger {
 	private TestingResourceManagerGateway resourceManagerGateway;
 
 	private ComponentMainThreadExecutor mainThreadExecutor =
-		TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+		ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 	@Before
 	public void setUp() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
index ce8aedb..7762f3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 
@@ -46,7 +46,7 @@ public class SlotPoolResource extends ExternalResource {
 
 	public SlotPoolResource(@Nonnull SlotSelectionStrategy schedulingStrategy) {
 		this.schedulingStrategy = schedulingStrategy;
-		this.mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+		this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
 		slotPool = null;
 		testingResourceManagerGateway = null;
 	}