You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/04 16:34:34 UTC

flink git commit: [FLINK-3693] [tests] Wait for task manager to register before submitting job

Repository: flink
Updated Branches:
  refs/heads/master 21480e29a -> 9e7c6645f


[FLINK-3693] [tests] Wait for task manager to register before submitting job

- This test could fail when the job was submitted before the task manager
  connects to the leading job manager (see [1])

[1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/120072682/log.txt


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e7c6645
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e7c6645
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e7c6645

Branch: refs/heads/master
Commit: 9e7c6645f0f0e88cdfd092d72e810cd52352ca63
Parents: 21480e2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Apr 4 14:33:23 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 4 16:34:17 2016 +0200

----------------------------------------------------------------------
 .../test/recovery/JobManagerHAJobGraphRecoveryITCase.java | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e7c6645/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bd83e52..2418853 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -60,6 +60,8 @@ import org.junit.Test;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -298,6 +300,14 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 						leaderAddress, testSystem, deadline.timeLeft());
 				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
 
+				int numSlots = 0;
+				while (numSlots == 0) {
+					Future<?> slotsFuture = leader.ask(JobManagerMessages
+							.getRequestTotalNumberOfSlots(), deadline.timeLeft());
+
+					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
+				}
+
 				// Submit the job in non-detached mode
 				leader.tell(new SubmitJob(jobGraph,
 						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);