You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/31 11:43:34 UTC
[flink] branch master updated: [FLINK-13487][tests] Wait until TE
has registerd at JM
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3958cec [FLINK-13487][tests] Wait until TE has registerd at JM
3958cec is described below
commit 3958cec0d1be3c2e4e9c48ec675085557384b4cd
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Wed Jul 31 19:43:16 2019 +0800
[FLINK-13487][tests] Wait until TE has registerd at JM
---
.../taskexecutor/TaskExecutorPartitionLifecycleTest.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 490cbb0..f5999e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -255,10 +256,14 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
.build();
final CompletableFuture<Void> taskFinishedFuture = new CompletableFuture<>();
+ final OneShotLatch slotOfferedLatch = new OneShotLatch();
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
.setRegisterTaskManagerFunction((s, location) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(ResourceID.generate())))
- .setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers))
+ .setOfferSlotsFunction((resourceID, slotOffers) -> {
+ slotOfferedLatch.trigger();
+ return CompletableFuture.completedFuture(slotOffers);
+ })
.setUpdateTaskExecutionStateFunction(taskExecutionState -> {
if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
taskFinishedFuture.complete(null);
@@ -327,6 +332,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
TestingInvokable.sync = new BlockerSync();
+ // Wait till the slot has been successfully offered before submitting the task.
+ // This ensures TM has been successfully registered to JM.
+ slotOfferedLatch.await();
+
taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout)
.get();