You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/07 14:42:22 UTC
[flink] 03/03: [hotfix][runtime] Merge
TestingComponentMainThreadExecutorServiceAdapter and
ComponentMainThreadExecutorServiceAdapter
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21468e0050dc5f97de5cfe39885e0d3fd648e399
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jul 4 18:07:22 2019 +0200
[hotfix][runtime] Merge TestingComponentMainThreadExecutorServiceAdapter and ComponentMainThreadExecutorServiceAdapter
Merge TestingComponentMainThreadExecutorServiceAdapter with
ComponentMainThreadExecutorServiceAdapter because it only adds static methods.
Move ComponentMainThreadExecutorServiceAdapter to test sources
This closes #8989.
---
.../ExecutionGraphCheckpointCoordinatorTest.java | 4 +-
.../ComponentMainThreadExecutorServiceAdapter.java | 41 ++++++++++----
.../executiongraph/ArchivedExecutionGraphTest.java | 3 +-
...ncurrentFailoverStrategyExecutionGraphTest.java | 4 +-
.../ExecutionGraphCoLocationRestartTest.java | 3 +-
.../ExecutionGraphDeploymentTest.java | 11 ++--
.../executiongraph/ExecutionGraphMetricsTest.java | 4 +-
.../ExecutionGraphPartitionReleaseTest.java | 3 +-
.../executiongraph/ExecutionGraphRestartTest.java | 6 +-
.../ExecutionGraphSchedulingTest.java | 15 ++---
.../executiongraph/ExecutionGraphSuspendTest.java | 5 +-
.../executiongraph/ExecutionGraphTestUtils.java | 3 +-
.../ExecutionGraphVariousFailuesTest.java | 7 ++-
.../runtime/executiongraph/ExecutionTest.java | 11 ++--
.../ExecutionVertexInputConstraintTest.java | 4 +-
.../runtime/executiongraph/FailoverRegionTest.java | 9 +--
.../executiongraph/FinalizeOnMasterTest.java | 5 +-
.../executiongraph/GlobalModVersionTest.java | 3 +-
.../TestingComponentMainThreadExecutor.java | 3 +-
...gComponentMainThreadExecutorServiceAdapter.java | 64 ----------------------
.../jobmanager/scheduler/SchedulerTestBase.java | 4 +-
.../jobmaster/slotpool/SlotPoolImplTest.java | 4 +-
.../jobmaster/slotpool/SlotPoolResource.java | 4 +-
23 files changed, 97 insertions(+), 123 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 267b685..5cd1dba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -152,7 +152,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
VoidBlobWriter.getInstance(),
timeout);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
100,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
similarity index 72%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index e3b0690..b255d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -19,14 +19,16 @@
package org.apache.flink.runtime.concurrent;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import javax.annotation.Nonnull;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a
* {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
@@ -41,23 +43,38 @@ public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainT
public ComponentMainThreadExecutorServiceAdapter(
final ScheduledExecutorService scheduledExecutorService,
- final Runnable mainThreadCheck) {
- this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
+ final Thread mainThread) {
+ this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThread);
}
public ComponentMainThreadExecutorServiceAdapter(
- final ScheduledExecutor scheduledExecutorService,
+ final ScheduledExecutor scheduledExecutor,
final Thread mainThread) {
- this(scheduledExecutorService, () -> {
+ this.scheduledExecutor = scheduledExecutor;
+ this.mainThreadCheck = () -> {
assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
- });
+ };
}
- private ComponentMainThreadExecutorServiceAdapter(
- final ScheduledExecutor scheduledExecutor,
- final Runnable mainThreadCheck) {
- this.scheduledExecutor = checkNotNull(scheduledExecutor);
- this.mainThreadCheck = checkNotNull(mainThreadCheck);
+ public static ComponentMainThreadExecutor forMainThread() {
+ final Thread main = Thread.currentThread();
+ return new ComponentMainThreadExecutorServiceAdapter(new DirectScheduledExecutorService() {
+ @Override
+ public void execute(Runnable command) {
+ assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
+ super.execute(command);
+ }
+ }, main);
+ }
+
+ /**
+ * Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must
+ * execute all submissions with the same thread.
+ */
+ public static ComponentMainThreadExecutor forSingleThreadExecutor(
+ @Nonnull ScheduledExecutorService singleThreadExecutor) {
+ final Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join();
+ return new ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 0d481d6..72d8234 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -113,7 +114,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
new NoRestartStrategy(),
mock(SlotProvider.class));
- runtimeGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ runtimeGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
runtimeGraph.attachJobGraph(vertices);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 64c9ae4..3198dc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -86,7 +88,7 @@ import static org.mockito.Mockito.when;
*/
public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger {
- private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
/**
* Tests that a cancellation concurrent to a local failover leads to a properly
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index d32e2d6..e76bc47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -74,7 +75,7 @@ public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
// enable the queued scheduling for the slot pool
eg.setQueuedSchedulingAllowed(true);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
assertEquals(JobStatus.CREATED, eg.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7ad1dbb..bf2b40d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -186,7 +187,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
blobWriter,
AkkaUtils.getDefaultTimeout());
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
checkJobOffloaded(eg);
@@ -473,7 +474,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
blobWriter,
AkkaUtils.getDefaultTimeout());
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
checkJobOffloaded(eg);
@@ -556,7 +557,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
AkkaUtils.getDefaultTimeout());
checkJobOffloaded(eg);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.setQueuedSchedulingAllowed(false);
@@ -636,7 +637,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
sourceVertex,
sinkVertex);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
executionGraph.setScheduleMode(ScheduleMode.EAGER);
executionGraph.scheduleForExecution();
@@ -726,7 +727,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
executionGraph.setScheduleMode(ScheduleMode.EAGER);
executionGraph.setQueuedSchedulingAllowed(true);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
executionGraph.scheduleForExecution();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 878a736..72d3416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
@@ -49,7 +51,7 @@ import static org.junit.Assert.assertTrue;
public class ExecutionGraphMetricsTest extends TestLogger {
- private final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
/**
* This test tests that the restarting time metric correctly displays restarting times.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 0110642..7d2369f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -60,7 +61,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger {
private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private static final TestingComponentMainThreadExecutor mainThreadExecutor =
new TestingComponentMainThreadExecutor(
- TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
+ ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
@Test
public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index e78fa6f..70df983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -101,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
- private static final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor =
- TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ private static final ComponentMainThreadExecutor mainThreadExecutor =
+ ComponentMainThreadExecutorServiceAdapter.forMainThread();
private static final JobID TEST_JOB_ID = new JobID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 9ceaf84..bdbcf94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -131,7 +132,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
// set up two TaskManager gateways and slots
@@ -231,7 +232,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
//
// kick off the scheduling
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.setScheduleMode(ScheduleMode.EAGER);
eg.setQueuedSchedulingAllowed(true);
eg.scheduleForExecution();
@@ -318,7 +319,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
slotProvider.addSlots(targetVertex.getID(), targetFutures);
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
//
// we complete some of the futures
@@ -398,7 +399,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
slotFutures[1].complete(slots[1]);
// kick off the scheduling
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.setScheduleMode(ScheduleMode.EAGER);
eg.setQueuedSchedulingAllowed(true);
eg.scheduleForExecution();
@@ -442,7 +443,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
executionGraph.scheduleForExecution();
final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
@@ -490,7 +491,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
executionGraph.scheduleForExecution();
assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
@@ -539,7 +540,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
final Set<SlotRequestId> slotRequestIdsToReturn = ConcurrentHashMap.newKeySet(slotRequestIds.size());
executionGraph.scheduleForExecution();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 2d5dd67..01e2146 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
@@ -221,7 +222,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
@Test
public void testSuspendWhileRestarting() throws Exception {
final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
@@ -296,7 +297,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
slotProvider,
new FixedDelayRestartStrategy(0, 0),
vertex);
- simpleTestGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ simpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
return simpleTestGraph;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 8623eda..3143074 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -448,7 +449,7 @@ public class ExecutionGraphTestUtils {
.setFutureExecutor(executor)
.build();
- graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ graph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
return new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
index e29902e..67c3e8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -41,7 +42,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
@Test
public void testFailureWhileRestarting() throws Exception {
final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(2));
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
@@ -72,7 +73,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
@Test
public void testSuppressRestartFailureWhileRestarting() throws Exception {
final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
@@ -97,7 +98,7 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger {
@Test
public void testFailingScheduleOrUpdateConsumers() throws Exception {
final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 3a0770e..c2db133 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -119,7 +120,7 @@ public class ExecutionTest extends TestLogger {
new NoRestartStrategy(),
jobVertex);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
@@ -181,7 +182,7 @@ public class ExecutionTest extends TestLogger {
new NoRestartStrategy(),
jobVertex);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
@@ -283,7 +284,7 @@ public class ExecutionTest extends TestLogger {
new NoRestartStrategy(),
jobVertex);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
@@ -393,7 +394,7 @@ public class ExecutionTest extends TestLogger {
new NoRestartStrategy(),
jobVertex);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
@@ -596,7 +597,7 @@ public class ExecutionTest extends TestLogger {
NettyShuffleMaster.INSTANCE,
partitionTracker);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
index 15ff07d..9aaed7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -51,7 +53,7 @@ import static org.junit.Assert.assertTrue;
*/
public class ExecutionVertexInputConstraintTest extends TestLogger {
- private TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
@Test
public void testInputConsumable() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 7575dcd..0a7f742 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -201,7 +202,7 @@ public class FailoverRegionTest extends TestLogger {
enableCheckpointing(eg);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
@@ -321,7 +322,7 @@ public class FailoverRegionTest extends TestLogger {
e.printStackTrace();
fail("Job failed with exception: " + e.getMessage());
}
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
@@ -482,7 +483,7 @@ public class FailoverRegionTest extends TestLogger {
slotProvider);
eg.attachJobGraph(ordered);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
@@ -582,7 +583,7 @@ public class FailoverRegionTest extends TestLogger {
enableCheckpointing(eg);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
attachPendingCheckpoints(eg);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
index df882ff..1f344fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -54,7 +55,7 @@ public class FinalizeOnMasterTest extends TestLogger {
vertex2.setParallelism(2);
final ExecutionGraph eg = createSimpleTestGraph(jid, vertex1, vertex2);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
@@ -79,7 +80,7 @@ public class FinalizeOnMasterTest extends TestLogger {
vertex.setParallelism(1);
final ExecutionGraph eg = createSimpleTestGraph(jid, vertex);
- eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
eg.scheduleForExecution();
assertEquals(JobStatus.RUNNING, eg.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index e1b7da6..e47d489 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -171,7 +172,7 @@ public class GlobalModVersionTest extends TestLogger {
new CustomStrategy(failoverStrategy),
slotProvider);
- graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ graph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
index 56a39c0..2771cb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -95,7 +96,7 @@ public class TestingComponentMainThreadExecutor {
this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
this.componentMainThreadTestExecutor =
new TestingComponentMainThreadExecutor(
- TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(innerExecutorService));
+ ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(innerExecutorService));
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java
deleted file mode 100644
index 7add060..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutorServiceAdapter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * An implementation of {@link org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests.
- */
-public class TestingComponentMainThreadExecutorServiceAdapter extends ComponentMainThreadExecutorServiceAdapter {
-
- public TestingComponentMainThreadExecutorServiceAdapter(
- @Nonnull ScheduledExecutorService scheduledExecutorService,
- @Nonnull Thread mainThread) {
-
- super(scheduledExecutorService, () -> {
- assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
- });
- }
-
- public static TestingComponentMainThreadExecutorServiceAdapter forMainThread() {
- final Thread main = Thread.currentThread();
- return new TestingComponentMainThreadExecutorServiceAdapter(new DirectScheduledExecutorService() {
- @Override
- public void execute(Runnable command) {
- assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
- super.execute(command);
- }
- }, main);
- }
-
- /**
- * Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must
- * execute all submissions with the same thread.
- */
- public static TestingComponentMainThreadExecutorServiceAdapter forSingleThreadExecutor(
- @Nonnull ScheduledExecutorService singleThreadExecutor) {
- Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join();
- return new TestingComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index fca13aa..873793f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,7 +81,7 @@ public class SchedulerTestBase extends TestLogger {
final JobMasterId jobMasterId = JobMasterId.generate();
final String jobManagerAddress = "localhost";
- ComponentMainThreadExecutor executor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ ComponentMainThreadExecutor executor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
slotPool.start(jobMasterId, jobManagerAddress, executor);
testingScheduler.start(executor);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
index 1ff1bc3..fcbbbb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
@@ -100,7 +100,7 @@ public class SlotPoolImplTest extends TestLogger {
private TestingResourceManagerGateway resourceManagerGateway;
private ComponentMainThreadExecutor mainThreadExecutor =
- TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ ComponentMainThreadExecutorServiceAdapter.forMainThread();
@Before
public void setUp() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
index ce8aedb..7762f3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
@@ -46,7 +46,7 @@ public class SlotPoolResource extends ExternalResource {
public SlotPoolResource(@Nonnull SlotSelectionStrategy schedulingStrategy) {
this.schedulingStrategy = schedulingStrategy;
- this.mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
slotPool = null;
testingResourceManagerGateway = null;
}