You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/14 16:07:58 UTC
[5/5] flink git commit: [FLINK-9573] Extend
LeaderElectionService#hasLeadership to take leader session id
[FLINK-9573] Extend LeaderElectionService#hasLeadership to take leader session id
The new LeaderElectionService#hasLeadership also takes the leader session id and verifies whether
this is the correct leader session id associated with the leadership.
This closes #6154.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/363de6b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/363de6b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/363de6b6
Branch: refs/heads/master
Commit: 363de6b643689f64564270857f1daf7f6c59257f
Parents: 467ad78
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:24:59 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200
----------------------------------------------------------------------
.../flink/docs/rest/RestAPIDocGenerator.java | 4 +-
.../nonha/embedded/EmbeddedLeaderService.java | 7 +-
.../SingleLeaderElectionService.java | 5 +-
.../runtime/jobmaster/JobManagerRunner.java | 2 +-
.../leaderelection/LeaderElectionService.java | 9 +-
.../StandaloneLeaderElectionService.java | 6 +-
.../ZooKeeperLeaderElectionService.java | 8 +-
.../flink/runtime/jobmanager/JobManager.scala | 6 +-
.../leaderelection/LeaderElectionTest.java | 245 +++++++++++++++++++
.../TestingLeaderElectionService.java | 8 +-
.../testingUtils/TestingJobManagerLike.scala | 2 +-
11 files changed, 282 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index e69cc7e..cebde56 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -54,6 +54,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -369,7 +371,7 @@ public class RestAPIDocGenerator {
}
@Override
- public boolean hasLeadership() {
+ public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
index f89cd2c..dafefcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -62,7 +63,7 @@ public class EmbeddedLeaderService {
private EmbeddedLeaderElectionService currentLeaderConfirmed;
/** fencing UID for the current leader (or proposed leader). */
- private UUID currentLeaderSessionId;
+ private volatile UUID currentLeaderSessionId;
/** the cached address of the current leader. */
private String currentLeaderAddress;
@@ -356,8 +357,8 @@ public class EmbeddedLeaderService {
}
@Override
- public boolean hasLeadership() {
- return isLeader;
+ public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+ return isLeader && leaderSessionId.equals(currentLeaderSessionId);
}
void shutdown(Exception cause) {
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
index a56b077..bb7f44b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.util.HashSet;
@@ -162,9 +163,9 @@ public class SingleLeaderElectionService implements LeaderElectionService {
}
@Override
- public boolean hasLeadership() {
+ public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
synchronized (lock) {
- return leader != null;
+ return proposedLeader != null && leaderSessionId.equals(leaderId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index f04e4af..d867ab3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -340,7 +340,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
}
private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
- if (leaderElectionService.hasLeadership()) {
+ if (leaderElectionService.hasLeadership(leaderSessionId)) {
currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
leaderElectionService.confirmLeaderSessionID(leaderSessionId);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index 6cba141..10f2f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.leaderelection;
+import javax.annotation.Nonnull;
+
import java.util.UUID;
/**
@@ -62,10 +64,11 @@ public interface LeaderElectionService {
/**
* Returns true if the {@link LeaderContender} with which the service has been started owns
- * currently the leadership.
+ * currently the leadership under the given leader session id.
+ *
+ * @param leaderSessionId identifying the current leader
*
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
- boolean hasLeadership();
-
+ boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index a956a5e..ec997a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
+
import java.util.UUID;
/**
@@ -58,7 +60,7 @@ public class StandaloneLeaderElectionService implements LeaderElectionService {
public void confirmLeaderSessionID(UUID leaderSessionID) {}
@Override
- public boolean hasLeadership() {
- return true;
+ public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+ return (contender != null && HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index dc0f3ae..87684c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -37,6 +37,8 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
@@ -66,7 +68,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
/** ZooKeeper path of the node which stores the current leader information. */
private final String leaderPath;
- private UUID issuedLeaderSessionID;
+ private volatile UUID issuedLeaderSessionID;
private volatile UUID confirmedLeaderSessionID;
@@ -205,8 +207,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
}
@Override
- public boolean hasLeadership() {
- return leaderLatch.hasLeadership();
+ public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+ return leaderLatch.hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4e4eb4c..cebff58 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -490,7 +490,8 @@ class JobManager(
Option(submittedJobGraphOption) match {
case Some(submittedJobGraph) =>
- if (!leaderElectionService.hasLeadership()) {
+ if (leaderSessionID.isEmpty ||
+ !leaderElectionService.hasLeadership(leaderSessionID.get)) {
// we've lost leadership. mission: abort.
log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
} else {
@@ -1381,7 +1382,8 @@ class JobManager(
jobInfo.notifyClients(
decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
- if (leaderElectionService.hasLeadership) {
+ if (leaderSessionID.isDefined &&
+ leaderElectionService.hasLeadership(leaderSessionID.get)) {
// There is a small chance that multiple job managers schedule the same job after if
// they try to recover at the same time. This will eventually be noticed, but can not be
// ruled out from the beginning.
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
new file mode 100644
index 0000000..44016f8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for leader election.
+ */
+@RunWith(Parameterized.class)
+public class LeaderElectionTest extends TestLogger {
+
+ enum LeaderElectionType {
+ ZooKeeper,
+ Embedded,
+ Standalone
+ }
+
+ @Parameterized.Parameters(name = "Leader election: {0}")
+ public static Collection<LeaderElectionType> parameters () {
+ return Arrays.asList(LeaderElectionType.values());
+ }
+
+ private final ServiceClass serviceClass;
+
+ public LeaderElectionTest(LeaderElectionType leaderElectionType) {
+ switch(leaderElectionType) {
+ case ZooKeeper:
+ serviceClass = new ZooKeeperServiceClass();
+ break;
+ case Embedded:
+ serviceClass = new EmbeddedServiceClass();
+ break;
+ case Standalone:
+ serviceClass = new StandaloneServiceClass();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown leader election type: %s.", leaderElectionType));
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ serviceClass.setup();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ serviceClass.teardown();
+ }
+
+ @Test
+ public void testHasLeadership() throws Exception {
+ final LeaderElectionService leaderElectionService = serviceClass.createLeaderElectionService();
+ final ManualLeaderContender manualLeaderContender = new ManualLeaderContender();
+
+ try {
+ assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));
+
+ leaderElectionService.start(manualLeaderContender);
+
+ final UUID leaderSessionId = manualLeaderContender.waitForLeaderSessionId();
+
+ assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
+ assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));
+
+ leaderElectionService.confirmLeaderSessionID(leaderSessionId);
+
+ assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
+
+ leaderElectionService.stop();
+
+ assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(false));
+ } finally {
+ manualLeaderContender.rethrowError();
+ }
+ }
+
+ private static final class ManualLeaderContender implements LeaderContender {
+
+ private static final UUID NULL_LEADER_SESSION_ID = new UUID(0L, 0L);
+
+ private final ArrayBlockingQueue<UUID> leaderSessionIds = new ArrayBlockingQueue<>(10);
+
+ private volatile Exception exception;
+
+ @Override
+ public void grantLeadership(UUID leaderSessionID) {
+ leaderSessionIds.offer(leaderSessionID);
+ }
+
+ @Override
+ public void revokeLeadership() {
+ leaderSessionIds.offer(NULL_LEADER_SESSION_ID);
+ }
+
+ @Override
+ public String getAddress() {
+ return "foobar";
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ this.exception = exception;
+ }
+
+ void rethrowError() throws Exception {
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ UUID waitForLeaderSessionId() throws InterruptedException {
+ return leaderSessionIds.take();
+ }
+ }
+
+ private interface ServiceClass {
+ void setup() throws Exception;
+
+ void teardown() throws Exception;
+
+ LeaderElectionService createLeaderElectionService() throws Exception;
+ }
+
+ private static final class ZooKeeperServiceClass implements ServiceClass {
+
+ private TestingServer testingServer;
+
+ private CuratorFramework client;
+
+ private Configuration configuration;
+
+ @Override
+ public void setup() throws Exception {
+ try {
+ testingServer = new TestingServer();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
+ }
+
+ configuration = new Configuration();
+
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+ client = ZooKeeperUtils.startCuratorFramework(configuration);
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ if (testingServer != null) {
+ testingServer.stop();
+ testingServer = null;
+ }
+ }
+
+ @Override
+ public LeaderElectionService createLeaderElectionService() throws Exception {
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ }
+ }
+
+ private static final class EmbeddedServiceClass implements ServiceClass {
+ private EmbeddedLeaderService embeddedLeaderService;
+
+ @Override
+ public void setup() {
+ embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutionContext());
+ }
+
+ @Override
+ public void teardown() {
+ if (embeddedLeaderService != null) {
+ embeddedLeaderService.shutdown();
+ embeddedLeaderService = null;
+ }
+ }
+
+ @Override
+ public LeaderElectionService createLeaderElectionService() throws Exception {
+ return embeddedLeaderService.createLeaderElectionService();
+ }
+ }
+
+ private static final class StandaloneServiceClass implements ServiceClass {
+
+ @Override
+ public void setup() throws Exception {
+ // noop
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ // noop
+ }
+
+ @Override
+ public LeaderElectionService createLeaderElectionService() throws Exception {
+ return new StandaloneLeaderElectionService();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 4ecb9b61..25f97c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.leaderelection;
+import javax.annotation.Nonnull;
+
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +32,7 @@ public class TestingLeaderElectionService implements LeaderElectionService {
private LeaderContender contender;
private boolean hasLeadership = false;
private CompletableFuture<UUID> confirmationFuture = null;
+ private UUID issuedLeaderSessionId = null;
/**
* Gets a future that completes when leadership is confirmed.
@@ -58,8 +61,8 @@ public class TestingLeaderElectionService implements LeaderElectionService {
}
@Override
- public synchronized boolean hasLeadership() {
- return hasLeadership;
+ public synchronized boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+ return hasLeadership && leaderSessionId.equals(issuedLeaderSessionId);
}
public synchronized CompletableFuture<UUID> isLeader(UUID leaderSessionID) {
@@ -68,6 +71,7 @@ public class TestingLeaderElectionService implements LeaderElectionService {
}
confirmationFuture = new CompletableFuture<>();
hasLeadership = true;
+ issuedLeaderSessionId = leaderSessionID;
contender.grantLeadership(leaderSessionID);
return confirmationFuture;
http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 2c49d0f..0640f39 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -409,7 +409,7 @@ trait TestingJobManagerLike extends FlinkActor {
}
case NotifyWhenLeader =>
- if (leaderElectionService.hasLeadership) {
+ if (leaderSessionID.isDefined && leaderElectionService.hasLeadership(leaderSessionID.get)) {
sender() ! true
} else {
waitForLeader += sender()