You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/06/13 13:22:00 UTC

[flink] branch release-1.11 updated: [FLINK-15687][runtime][test] Fix test instability due to concurrent access to JobTable.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 15e7c26  [FLINK-15687][runtime][test] Fix test instability due to concurrent access to JobTable.
15e7c26 is described below

commit 15e7c26ab6b9c9b2b1faa81495dc941e2f10bac6
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Jun 12 10:30:35 2020 +0800

    [FLINK-15687][runtime][test] Fix test instability due to concurrent access to JobTable.
    
    This closes #12623.
---
 .../TaskExecutorPartitionLifecycleTest.java        | 16 ++++++----
 .../TaskSubmissionTestEnvironment.java             | 37 ++++++++++++++--------
 2 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9a3afa1..578ac92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -165,13 +165,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			}).build();
 
 		final DefaultJobTable jobTable = DefaultJobTable.create();
-		TaskSubmissionTestEnvironment.registerJobMasterConnection(
-			jobTable,
-			jobId,
-			rpc,
-			jobMasterGateway,
-			new NoOpTaskManagerActions(),
-			timeout);
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setJobTable(jobTable)
@@ -196,6 +189,15 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			taskExecutor.start();
 			taskExecutor.waitUntilStarted();
 
+			TaskSubmissionTestEnvironment.registerJobMasterConnection(
+				jobTable,
+				jobId,
+				rpc,
+				jobMasterGateway,
+				new NoOpTaskManagerActions(),
+				timeout,
+				taskExecutor.getMainThreadExecutableForTesting());
+
 			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 
 			trackerIsTrackingPartitions.set(true);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index e79b720..d7fcd02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -170,7 +171,14 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 			taskManagerActions = testTaskManagerActions;
 		}
 
-		registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout);
+		registerJobMasterConnection(
+			jobTable,
+			jobId,
+			testingRpcService,
+			jobMasterGateway,
+			taskManagerActions,
+			timeout,
+			taskExecutor.getMainThreadExecutableForTesting());
 	}
 
 	static void registerJobMasterConnection(
@@ -179,18 +187,21 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 			RpcService testingRpcService,
 			JobMasterGateway jobMasterGateway,
 			TaskManagerActions taskManagerActions,
-			Time timeout) {
-		final JobTable.Job job = jobTable.getOrCreateJob(jobId, () -> TestingJobServices.newBuilder().build());
-		job.connect(
-			ResourceID.generate(),
-			jobMasterGateway,
-			taskManagerActions,
-			new TestCheckpointResponder(),
-			new TestGlobalAggregateManager(),
-			new RpcResultPartitionConsumableNotifier(jobMasterGateway, testingRpcService.getExecutor(), timeout),
-			TestingPartitionProducerStateChecker.newBuilder()
-				.setPartitionProducerStateFunction((jobID, intermediateDataSetID, resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))
-				.build());
+			Time timeout,
+			MainThreadExecutable mainThreadExecutable) {
+		mainThreadExecutable.runAsync(() -> {
+			final JobTable.Job job = jobTable.getOrCreateJob(jobId, () -> TestingJobServices.newBuilder().build());
+			job.connect(
+				ResourceID.generate(),
+				jobMasterGateway,
+				taskManagerActions,
+				new TestCheckpointResponder(),
+				new TestGlobalAggregateManager(),
+				new RpcResultPartitionConsumableNotifier(jobMasterGateway, testingRpcService.getExecutor(), timeout),
+				TestingPartitionProducerStateChecker.newBuilder()
+					.setPartitionProducerStateFunction((jobID, intermediateDataSetID, resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))
+					.build());
+		});
 	}
 
 	public TestingTaskExecutor getTaskExecutor() {