You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/05 14:29:51 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13540: [FLINK-19344] Fix DispatcherResourceCleanupTest race condition

tillrohrmann commented on a change in pull request #13540:
URL: https://github.com/apache/flink/pull/13540#discussion_r499630797



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
##########
@@ -39,6 +39,8 @@
 
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
 
+	private final CompletableFuture<Void> closeAsyncCalledFuture = new CompletableFuture<>();

Review comment:
       I would suggest to use a `OneShotLatch`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -350,6 +350,10 @@ public void testJobSubmissionUnderSameJobId() throws Exception {
 		final TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
 		testingJobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId));
 
+		// wait until termination JobManagerRunner closeAsync has been called.
+		// this is necessary to avoid race conditions with completion of the 1st job and the submission of the 2nd job (DuplicateJobSubmissionException).
+		testingJobManagerRunner.getCloseAsyncCalledFuture().get();
+

Review comment:
       I agree that relying on the fact that the testing main thread will enqueue the handleAsync payload before the `submitJob` is not a safe assumption. Hence, the fix should be fine. Still I would like to understand what exactly is going wrong here.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -350,6 +350,10 @@ public void testJobSubmissionUnderSameJobId() throws Exception {
 		final TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
 		testingJobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId));
 
+		// wait until termination JobManagerRunner closeAsync has been called.
+		// this is necessary to avoid race conditions with completion of the 1st job and the submission of the 2nd job (DuplicateJobSubmissionException).
+		testingJobManagerRunner.getCloseAsyncCalledFuture().get();
+

Review comment:
       I don't fully understand why this additional synchronization step is necessary. If I am not mistaken, then `testingJobManagerRunner.completeResultFutureExceptionally` won't trigger `Dispatcher.jobNotFinished` directly but at least it will enqueue the `RunAsync` message which will run this task into the mailbox of the `Dispatcher`. `dispatcherGateway.submitJob` should do the same just that the submit message is enqueued after the `RunAsync` message.
   
   Could you show me an execution order in which the `submitJob` RPC call is executed before the `handleAsync` (https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L374)?
   
   Could you reproduce the problem locally?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org