You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:10 UTC

[03/50] [abbrv] flink git commit: [FLINK-6293] [tests] Harden JobManagerITCase

[FLINK-6293] [tests] Harden JobManagerITCase

One of the unit tests in JobManagerITCase starts a MiniCluster and sends a
LeaderSessionMessage to the JobManager without waiting until the JobManager
has gained leadership. This can lead to a dropped TriggerSavepoint message
which will cause the test to deadlock.

This PR fixes the problem by explicitly waiting for the JobManager to become
the leader.

This closes #3796.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3da8f69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3da8f69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3da8f69

Branch: refs/heads/table-retraction
Commit: f3da8f69e99be49068ab4ea3abc5e1c4eba7bf32
Parents: 7c35dc0
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Apr 28 10:04:57 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 15:25:44 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobmanager/JobManagerITCase.scala  | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3da8f69/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index ce8517e..5fb9ddf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -765,6 +765,11 @@ class JobManagerITCase(_system: ActorSystem)
           val jobManager = flinkCluster
             .getLeaderGateway(deadline.timeLeft)
 
+          // we have to make sure that the job manager knows also that he is the leader
+          // in case of standalone leader retrieval this can happen after the getLeaderGateway call
+          val leaderFuture = jobManager.ask(NotifyWhenLeader, timeout.duration)
+          Await.ready(leaderFuture, timeout.duration)
+
           val jobId = new JobID()
 
           // Trigger savepoint for non-existing job