You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/14 16:07:58 UTC

[5/5] flink git commit: [FLINK-9573] Extend LeaderElectionService#hasLeadership to take leader session id

[FLINK-9573] Extend LeaderElectionService#hasLeadership to take leader session id

The new LeaderElectionService#hasLeadership also takes the leader session id and verifies whether
this is the correct leader session id associated with the leadership.

This closes #6154.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/363de6b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/363de6b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/363de6b6

Branch: refs/heads/master
Commit: 363de6b643689f64564270857f1daf7f6c59257f
Parents: 467ad78
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:24:59 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../flink/docs/rest/RestAPIDocGenerator.java    |   4 +-
 .../nonha/embedded/EmbeddedLeaderService.java   |   7 +-
 .../SingleLeaderElectionService.java            |   5 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   2 +-
 .../leaderelection/LeaderElectionService.java   |   9 +-
 .../StandaloneLeaderElectionService.java        |   6 +-
 .../ZooKeeperLeaderElectionService.java         |   8 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../leaderelection/LeaderElectionTest.java      | 245 +++++++++++++++++++
 .../TestingLeaderElectionService.java           |   8 +-
 .../testingUtils/TestingJobManagerLike.scala    |   2 +-
 11 files changed, 282 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index e69cc7e..cebde56 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -54,6 +54,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -369,7 +371,7 @@ public class RestAPIDocGenerator {
 			}
 
 			@Override
-			public boolean hasLeadership() {
+			public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
 				return false;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
index f89cd2c..dafefcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -62,7 +63,7 @@ public class EmbeddedLeaderService {
 	private EmbeddedLeaderElectionService currentLeaderConfirmed;
 
 	/** fencing UID for the current leader (or proposed leader). */
-	private UUID currentLeaderSessionId;
+	private volatile UUID currentLeaderSessionId;
 
 	/** the cached address of the current leader. */
 	private String currentLeaderAddress;
@@ -356,8 +357,8 @@ public class EmbeddedLeaderService {
 		}
 
 		@Override
-		public boolean hasLeadership() {
-			return isLeader;
+		public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+			return isLeader && leaderSessionId.equals(currentLeaderSessionId);
 		}
 
 		void shutdown(Exception cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
index a56b077..bb7f44b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import java.util.HashSet;
@@ -162,9 +163,9 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	}
 
 	@Override
-	public boolean hasLeadership() {
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
 		synchronized (lock) {
-			return leader != null;
+			return proposedLeader != null && leaderSessionId.equals(leaderId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index f04e4af..d867ab3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -340,7 +340,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 	}
 
 	private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
-		if (leaderElectionService.hasLeadership()) {
+		if (leaderElectionService.hasLeadership(leaderSessionId)) {
 			currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
 			leaderElectionService.confirmLeaderSessionID(leaderSessionId);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index 6cba141..10f2f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import javax.annotation.Nonnull;
+
 import java.util.UUID;
 
 /**
@@ -62,10 +64,11 @@ public interface LeaderElectionService {
 
 	/**
 	 * Returns true if the {@link LeaderContender} with which the service has been started owns
-	 * currently the leadership.
+	 * currently the leadership under the given leader session id.
+	 *
+	 * @param leaderSessionId identifying the current leader
 	 *
 	 * @return true if the associated {@link LeaderContender} is the leader, otherwise false
 	 */
-	boolean hasLeadership();
-
+	boolean hasLeadership(@Nonnull UUID leaderSessionId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index a956a5e..ec997a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.util.UUID;
 
 /**
@@ -58,7 +60,7 @@ public class StandaloneLeaderElectionService implements LeaderElectionService {
 	public void confirmLeaderSessionID(UUID leaderSessionID) {}
 
 	@Override
-	public boolean hasLeadership() {
-		return true;
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return (contender != null && HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index dc0f3ae..87684c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -37,6 +37,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
@@ -66,7 +68,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	/** ZooKeeper path of the node which stores the current leader information. */
 	private final String leaderPath;
 
-	private UUID issuedLeaderSessionID;
+	private volatile UUID issuedLeaderSessionID;
 
 	private volatile UUID confirmedLeaderSessionID;
 
@@ -205,8 +207,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	}
 
 	@Override
-	public boolean hasLeadership() {
-		return leaderLatch.hasLeadership();
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return leaderLatch.hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4e4eb4c..cebff58 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -490,7 +490,8 @@ class JobManager(
 
             Option(submittedJobGraphOption) match {
               case Some(submittedJobGraph) =>
-                if (!leaderElectionService.hasLeadership()) {
+                if (leaderSessionID.isEmpty ||
+                  !leaderElectionService.hasLeadership(leaderSessionID.get)) {
                   // we've lost leadership. mission: abort.
                   log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
                 } else {
@@ -1381,7 +1382,8 @@ class JobManager(
           jobInfo.notifyClients(
             decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
 
-          if (leaderElectionService.hasLeadership) {
+          if (leaderSessionID.isDefined &&
+            leaderElectionService.hasLeadership(leaderSessionID.get)) {
             // There is a small chance that multiple job managers schedule the same job after if
             // they try to recover at the same time. This will eventually be noticed, but can not be
             // ruled out from the beginning.

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
new file mode 100644
index 0000000..44016f8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for leader election.
+ */
+@RunWith(Parameterized.class)
+public class LeaderElectionTest extends TestLogger {
+
+	enum LeaderElectionType {
+		ZooKeeper,
+		Embedded,
+		Standalone
+	}
+
+	@Parameterized.Parameters(name = "Leader election: {0}")
+	public static Collection<LeaderElectionType> parameters () {
+		return Arrays.asList(LeaderElectionType.values());
+	}
+
+	private final ServiceClass serviceClass;
+
+	public LeaderElectionTest(LeaderElectionType leaderElectionType) {
+		switch(leaderElectionType) {
+			case ZooKeeper:
+				serviceClass = new ZooKeeperServiceClass();
+				break;
+			case Embedded:
+				serviceClass = new EmbeddedServiceClass();
+				break;
+			case Standalone:
+				serviceClass = new StandaloneServiceClass();
+				break;
+			default:
+				throw new IllegalArgumentException(String.format("Unknown leader election type: %s.", leaderElectionType));
+		}
+	}
+
+	@Before
+	public void setup() throws Exception {
+		serviceClass.setup();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		serviceClass.teardown();
+	}
+
+	@Test
+	public void testHasLeadership() throws Exception {
+		final LeaderElectionService leaderElectionService = serviceClass.createLeaderElectionService();
+		final ManualLeaderContender manualLeaderContender = new ManualLeaderContender();
+
+		try {
+			assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));
+
+			leaderElectionService.start(manualLeaderContender);
+
+			final UUID leaderSessionId = manualLeaderContender.waitForLeaderSessionId();
+
+			assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
+			assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));
+
+			leaderElectionService.confirmLeaderSessionID(leaderSessionId);
+
+			assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
+
+			leaderElectionService.stop();
+
+			assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(false));
+		} finally {
+			manualLeaderContender.rethrowError();
+		}
+	}
+
+	private static final class ManualLeaderContender implements LeaderContender {
+
+		private static final UUID NULL_LEADER_SESSION_ID = new UUID(0L, 0L);
+
+		private final ArrayBlockingQueue<UUID> leaderSessionIds = new ArrayBlockingQueue<>(10);
+
+		private volatile Exception exception;
+
+		@Override
+		public void grantLeadership(UUID leaderSessionID) {
+			leaderSessionIds.offer(leaderSessionID);
+		}
+
+		@Override
+		public void revokeLeadership() {
+			leaderSessionIds.offer(NULL_LEADER_SESSION_ID);
+		}
+
+		@Override
+		public String getAddress() {
+			return "foobar";
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			this.exception = exception;
+		}
+
+		void rethrowError() throws Exception {
+			if (exception != null) {
+				throw exception;
+			}
+		}
+
+		UUID waitForLeaderSessionId() throws InterruptedException {
+			return leaderSessionIds.take();
+		}
+	}
+
+	private interface ServiceClass {
+		void setup() throws Exception;
+
+		void teardown() throws Exception;
+
+		LeaderElectionService createLeaderElectionService() throws Exception;
+	}
+
+	private static final class ZooKeeperServiceClass implements ServiceClass {
+
+		private TestingServer testingServer;
+
+		private CuratorFramework client;
+
+		private Configuration configuration;
+
+		@Override
+		public void setup() throws Exception {
+			try {
+				testingServer = new TestingServer();
+			} catch (Exception e) {
+				throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
+			}
+
+			configuration = new Configuration();
+
+			configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+			configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			client = ZooKeeperUtils.startCuratorFramework(configuration);
+		}
+
+		@Override
+		public void teardown() throws Exception {
+			if (client != null) {
+				client.close();
+				client = null;
+			}
+
+			if (testingServer != null) {
+				testingServer.stop();
+				testingServer = null;
+			}
+		}
+
+		@Override
+		public LeaderElectionService createLeaderElectionService() throws Exception {
+			return ZooKeeperUtils.createLeaderElectionService(client, configuration);
+		}
+	}
+
+	private static final class EmbeddedServiceClass implements ServiceClass {
+		private EmbeddedLeaderService embeddedLeaderService;
+
+		@Override
+		public void setup() {
+			embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutionContext());
+		}
+
+		@Override
+		public void teardown() {
+			if (embeddedLeaderService != null) {
+				embeddedLeaderService.shutdown();
+				embeddedLeaderService = null;
+			}
+		}
+
+		@Override
+		public LeaderElectionService createLeaderElectionService() throws Exception {
+			return embeddedLeaderService.createLeaderElectionService();
+		}
+	}
+
+	private static final class StandaloneServiceClass implements ServiceClass {
+
+		@Override
+		public void setup() throws Exception {
+			// noop
+		}
+
+		@Override
+		public void teardown() throws Exception {
+			// noop
+		}
+
+		@Override
+		public LeaderElectionService createLeaderElectionService() throws Exception {
+			return new StandaloneLeaderElectionService();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 4ecb9b61..25f97c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import javax.annotation.Nonnull;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
@@ -30,6 +32,7 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 	private LeaderContender contender;
 	private boolean hasLeadership = false;
 	private CompletableFuture<UUID> confirmationFuture = null;
+	private UUID issuedLeaderSessionId = null;
 
 	/**
 	 * Gets a future that completes when leadership is confirmed.
@@ -58,8 +61,8 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 	}
 
 	@Override
-	public synchronized boolean hasLeadership() {
-		return hasLeadership;
+	public synchronized boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return hasLeadership && leaderSessionId.equals(issuedLeaderSessionId);
 	}
 
 	public synchronized CompletableFuture<UUID> isLeader(UUID leaderSessionID) {
@@ -68,6 +71,7 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 		}
 		confirmationFuture = new CompletableFuture<>();
 		hasLeadership = true;
+		issuedLeaderSessionId = leaderSessionID;
 		contender.grantLeadership(leaderSessionID);
 
 		return confirmationFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 2c49d0f..0640f39 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -409,7 +409,7 @@ trait TestingJobManagerLike extends FlinkActor {
       }
 
     case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
+      if (leaderSessionID.isDefined && leaderElectionService.hasLeadership(leaderSessionID.get)) {
         sender() ! true
       } else {
         waitForLeader += sender()