You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/06/13 13:22:00 UTC
[flink] branch release-1.11 updated: [FLINK-15687][runtime][test]
Fix test instability due to concurrent access to JobTable.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 15e7c26 [FLINK-15687][runtime][test] Fix test instability due to concurrent access to JobTable.
15e7c26 is described below
commit 15e7c26ab6b9c9b2b1faa81495dc941e2f10bac6
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Jun 12 10:30:35 2020 +0800
[FLINK-15687][runtime][test] Fix test instability due to concurrent access to JobTable.
This closes #12623.
---
.../TaskExecutorPartitionLifecycleTest.java | 16 ++++++----
.../TaskSubmissionTestEnvironment.java | 37 ++++++++++++++--------
2 files changed, 33 insertions(+), 20 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9a3afa1..578ac92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -165,13 +165,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
}).build();
final DefaultJobTable jobTable = DefaultJobTable.create();
- TaskSubmissionTestEnvironment.registerJobMasterConnection(
- jobTable,
- jobId,
- rpc,
- jobMasterGateway,
- new NoOpTaskManagerActions(),
- timeout);
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setJobTable(jobTable)
@@ -196,6 +189,15 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
taskExecutor.start();
taskExecutor.waitUntilStarted();
+ TaskSubmissionTestEnvironment.registerJobMasterConnection(
+ jobTable,
+ jobId,
+ rpc,
+ jobMasterGateway,
+ new NoOpTaskManagerActions(),
+ timeout,
+ taskExecutor.getMainThreadExecutableForTesting());
+
final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
trackerIsTrackingPartitions.set(true);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index e79b720..d7fcd02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -170,7 +171,14 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
taskManagerActions = testTaskManagerActions;
}
- registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout);
+ registerJobMasterConnection(
+ jobTable,
+ jobId,
+ testingRpcService,
+ jobMasterGateway,
+ taskManagerActions,
+ timeout,
+ taskExecutor.getMainThreadExecutableForTesting());
}
static void registerJobMasterConnection(
@@ -179,18 +187,21 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
RpcService testingRpcService,
JobMasterGateway jobMasterGateway,
TaskManagerActions taskManagerActions,
- Time timeout) {
- final JobTable.Job job = jobTable.getOrCreateJob(jobId, () -> TestingJobServices.newBuilder().build());
- job.connect(
- ResourceID.generate(),
- jobMasterGateway,
- taskManagerActions,
- new TestCheckpointResponder(),
- new TestGlobalAggregateManager(),
- new RpcResultPartitionConsumableNotifier(jobMasterGateway, testingRpcService.getExecutor(), timeout),
- TestingPartitionProducerStateChecker.newBuilder()
- .setPartitionProducerStateFunction((jobID, intermediateDataSetID, resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))
- .build());
+ Time timeout,
+ MainThreadExecutable mainThreadExecutable) {
+ mainThreadExecutable.runAsync(() -> {
+ final JobTable.Job job = jobTable.getOrCreateJob(jobId, () -> TestingJobServices.newBuilder().build());
+ job.connect(
+ ResourceID.generate(),
+ jobMasterGateway,
+ taskManagerActions,
+ new TestCheckpointResponder(),
+ new TestGlobalAggregateManager(),
+ new RpcResultPartitionConsumableNotifier(jobMasterGateway, testingRpcService.getExecutor(), timeout),
+ TestingPartitionProducerStateChecker.newBuilder()
+ .setPartitionProducerStateFunction((jobID, intermediateDataSetID, resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))
+ .build());
+ });
}
public TestingTaskExecutor getTaskExecutor() {