You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/31 11:43:34 UTC

[flink] branch master updated: [FLINK-13487][tests] Wait until TE has registerd at JM

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3958cec  [FLINK-13487][tests] Wait until TE has registerd at JM
3958cec is described below

commit 3958cec0d1be3c2e4e9c48ec675085557384b4cd
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Wed Jul 31 19:43:16 2019 +0800

    [FLINK-13487][tests] Wait until TE has registerd at JM
---
 .../taskexecutor/TaskExecutorPartitionLifecycleTest.java      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 490cbb0..f5999e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -255,10 +256,14 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			.build();
 
 		final CompletableFuture<Void> taskFinishedFuture = new CompletableFuture<>();
+		final OneShotLatch slotOfferedLatch = new OneShotLatch();
 
 		final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
 			.setRegisterTaskManagerFunction((s, location) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(ResourceID.generate())))
-			.setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers))
+			.setOfferSlotsFunction((resourceID, slotOffers) -> {
+				slotOfferedLatch.trigger();
+				return CompletableFuture.completedFuture(slotOffers);
+			})
 			.setUpdateTaskExecutionStateFunction(taskExecutionState -> {
 				if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
 					taskFinishedFuture.complete(null);
@@ -327,6 +332,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 			TestingInvokable.sync = new BlockerSync();
 
+			// Wait till the slot has been successfully offered before submitting the task.
+			// This ensures TM has been successfully registered to JM.
+			slotOfferedLatch.await();
+
 			taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout)
 				.get();