You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/04 16:34:34 UTC
flink git commit: [FLINK-3693] [tests] Wait for task manager to
register before submitting job
Repository: flink
Updated Branches:
refs/heads/master 21480e29a -> 9e7c6645f
[FLINK-3693] [tests] Wait for task manager to register before submitting job
- This test could fail when the job was submitted before the task manager
connects to the leading job manager (see [1])
[1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/120072682/log.txt
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e7c6645
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e7c6645
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e7c6645
Branch: refs/heads/master
Commit: 9e7c6645f0f0e88cdfd092d72e810cd52352ca63
Parents: 21480e2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Apr 4 14:33:23 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 4 16:34:17 2016 +0200
----------------------------------------------------------------------
.../test/recovery/JobManagerHAJobGraphRecoveryITCase.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e7c6645/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bd83e52..2418853 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -60,6 +60,8 @@ import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -298,6 +300,14 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
leaderAddress, testSystem, deadline.timeLeft());
ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+ int numSlots = 0;
+ while (numSlots == 0) {
+ Future<?> slotsFuture = leader.ask(JobManagerMessages
+ .getRequestTotalNumberOfSlots(), deadline.timeLeft());
+
+ numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
+ }
+
// Submit the job in non-detached mode
leader.tell(new SubmitJob(jobGraph,
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);