You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:36 UTC

[02/10] flink git commit: [FLINK-3779] [runtime] Add KvStateLocation lookup service

[FLINK-3779] [runtime] Add KvStateLocation lookup service

- Adds an Akka-based KvStateLocation lookup service to be used by the client
  to look up location information.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/775a7878
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/775a7878
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/775a7878

Branch: refs/heads/master
Commit: 775a78784b212bd1dedb9bcc21b61c4c21841209
Parents: af07eed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon May 30 14:08:03 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Aug 9 16:42:05 2016 +0200

----------------------------------------------------------------------
 .../query/AkkaKvStateLocationLookupService.java | 320 ++++++++++++++++
 .../query/KvStateLocationLookupService.java     |  49 +++
 .../flink/runtime/query/UnknownJobManager.java  |  33 ++
 .../AkkaKvStateLocationLookupServiceTest.java   | 383 +++++++++++++++++++
 4 files changed, 785 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
new file mode 100644
index 0000000..ed93b2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * Akka-based {@link KvStateLocationLookupService} that retrieves the current
+ * JobManager address and uses it for lookups.
+ */
+class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
+
+	/** Future returned when no JobManager is available */
+	private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
+
+	/** Leader retrieval service to retrieve the current job manager. */
+	private final LeaderRetrievalService leaderRetrievalService;
+
+	/** The actor system used to resolve the JobManager address. */
+	private final ActorSystem actorSystem;
+
+	/** Timeout for JobManager ask-requests. */
+	private final FiniteDuration askTimeout;
+
+	/** Retry strategy factory on future failures. */
+	private final LookupRetryStrategyFactory retryStrategyFactory;
+
+	/** Current job manager future. */
+	private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
+
+	/**
+	 * Creates the Akka-based {@link KvStateLocationLookupService}.
+	 *
+	 * @param leaderRetrievalService Leader retrieval service to use.
+	 * @param actorSystem            Actor system to use.
+	 * @param askTimeout             Timeout for JobManager ask-requests.
+	 * @param retryStrategyFactory   Retry strategy if no JobManager available.
+	 */
+	AkkaKvStateLocationLookupService(
+			LeaderRetrievalService leaderRetrievalService,
+			ActorSystem actorSystem,
+			FiniteDuration askTimeout,
+			LookupRetryStrategyFactory retryStrategyFactory) {
+
+		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
+		this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system");
+		this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout");
+		this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
+	}
+
+	public void start() {
+		try {
+			leaderRetrievalService.start(this);
+		} catch (Exception e) {
+			LOG.error("Failed to start leader retrieval service", e);
+			throw new RuntimeException(e);
+		}
+	}
+
+	public void shutDown() {
+		try {
+			leaderRetrievalService.stop();
+		} catch (Exception e) {
+			LOG.error("Failed to stop leader retrieval service", e);
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) {
+		return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy());
+	}
+
+	/**
+	 * Returns a future holding the {@link KvStateLocation} for the given job
+	 * and KvState registration name.
+	 *
+	 * <p>If there is currently no JobManager registered with the service, the
+	 * request is retried. The retry behaviour is specified by the
+	 * {@link LookupRetryStrategy} of the lookup service.
+	 *
+	 * @param jobId               JobID the KvState instance belongs to
+	 * @param registrationName    Name under which the KvState has been registered
+	 * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures.
+	 * @return Future holding the {@link KvStateLocation}
+	 */
+	@SuppressWarnings("unchecked")
+	private Future<KvStateLocation> getKvStateLookupInfo(
+			final JobID jobId,
+			final String registrationName,
+			final LookupRetryStrategy lookupRetryStrategy) {
+
+		return jobManagerFuture
+				.flatMap(new Mapper<ActorGateway, Future<Object>>() {
+					@Override
+					public Future<Object> apply(ActorGateway jobManager) {
+						// Lookup the KvStateLocation
+						Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
+						return jobManager.ask(msg, askTimeout);
+					}
+				}, actorSystem.dispatcher())
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
+				.recoverWith(new Recover<Future<KvStateLocation>>() {
+					@Override
+					public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
+						// If the Future fails with UnknownJobManager, retry
+						// the request. Otherwise all Futures will be failed
+						// during the start up phase, when the JobManager did
+						// not notify this service yet or leadership is lost
+						// intermittently.
+						if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
+							return Patterns.after(
+									lookupRetryStrategy.getRetryDelay(),
+									actorSystem.scheduler(),
+									actorSystem.dispatcher(),
+									new Callable<Future<KvStateLocation>>() {
+										@Override
+										public Future<KvStateLocation> call() throws Exception {
+											return getKvStateLookupInfo(
+													jobId,
+													registrationName,
+													lookupRetryStrategy);
+										}
+									});
+						} else {
+							return Futures.failed(failure);
+						}
+					}
+				}, actorSystem.dispatcher());
+	}
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID);
+		}
+
+		if (leaderAddress == null) {
+			jobManagerFuture = UNKNOWN_JOB_MANAGER;
+		} else {
+			jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
+					.map(new Mapper<ActorRef, ActorGateway>() {
+						@Override
+						public ActorGateway apply(ActorRef actorRef) {
+							return new AkkaActorGateway(actorRef, leaderSessionID);
+						}
+					}, actorSystem.dispatcher());
+		}
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		jobManagerFuture = Futures.failed(exception);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Retry strategy for failed lookups.
+	 *
+	 * <p>Usage:
+	 * <pre>
+	 * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create();
+	 *
+	 * if (retryStrategy.tryRetry()) {
+	 *     // OK to retry
+	 *     FiniteDuration retryDelay = retryStrategy.getRetryDelay();
+	 * }
+	 * </pre>
+	 */
+	interface LookupRetryStrategy {
+
+		/**
+		 * Returns the current retry.
+		 *
+		 * @return Current retry delay.
+		 */
+		FiniteDuration getRetryDelay();
+
+		/**
+		 * Tries another retry and returns whether it is allowed or not.
+		 *
+		 * @return Whether it is allowed to do another restart or not.
+		 */
+		boolean tryRetry();
+
+	}
+
+	/**
+	 * Factory for retry strategies.
+	 */
+	interface LookupRetryStrategyFactory {
+
+		/**
+		 * Creates a new retry strategy.
+		 *
+		 * @return The retry strategy.
+		 */
+		LookupRetryStrategy createRetryStrategy();
+
+	}
+
+	/**
+	 * Factory for disabled retries.
+	 */
+	static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
+
+		private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();
+
+		@Override
+		public LookupRetryStrategy createRetryStrategy() {
+			return RETRY_STRATEGY;
+		}
+
+		private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
+
+			@Override
+			public FiniteDuration getRetryDelay() {
+				return FiniteDuration.Zero();
+			}
+
+			@Override
+			public boolean tryRetry() {
+				return false;
+			}
+		}
+
+	}
+
+	/**
+	 * Factory for fixed delay retries.
+	 */
+	static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
+
+		private final int maxRetries;
+		private final FiniteDuration retryDelay;
+
+		FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
+			this.maxRetries = maxRetries;
+			this.retryDelay = retryDelay;
+		}
+
+		@Override
+		public LookupRetryStrategy createRetryStrategy() {
+			return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay);
+		}
+
+		private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
+
+			private final Object retryLock = new Object();
+			private final int maxRetries;
+			private final FiniteDuration retryDelay;
+			private int numRetries;
+
+			public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
+				Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries");
+				this.maxRetries = maxRetries;
+				this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay");
+			}
+
+			@Override
+			public FiniteDuration getRetryDelay() {
+				synchronized (retryLock) {
+					return retryDelay;
+				}
+			}
+
+			@Override
+			public boolean tryRetry() {
+				synchronized (retryLock) {
+					if (numRetries < maxRetries) {
+						numRetries++;
+						return true;
+					} else {
+						return false;
+					}
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
new file mode 100644
index 0000000..cce432e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import scala.concurrent.Future;
+
+/**
+ * {@link KvStateLocation} lookup service.
+ */
+public interface KvStateLocationLookupService {
+
+	/**
+	 * Starts the lookup service.
+	 */
+	void start();
+
+	/**
+	 * Shuts down the lookup service.
+	 */
+	void shutDown();
+
+	/**
+	 * Returns a future holding the {@link KvStateLocation} for the given job
+	 * and KvState registration name.
+	 *
+	 * @param jobId            JobID the KvState instance belongs to
+	 * @param registrationName Name under which the KvState has been registered
+	 * @return Future holding the {@link KvStateLocation}
+	 */
+	Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
new file mode 100644
index 0000000..3549ed6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * Exception to fail Future with if no JobManager is currently registered at
+ * the {@link KvStateLocationLookupService}.
+ */
+class UnknownJobManager extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	public UnknownJobManager() {
+		super("Unknown JobManager. Either the JobManager has not registered yet " +
+				"or has lost leadership.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
new file mode 100644
index 0000000..e9950fb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy;
+import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
+import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
+import org.apache.flink.util.Preconditions;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AkkaKvStateLocationLookupServiceTest {
+
+	/** The default timeout. */
+	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+
+	/** Test actor system shared between the tests. */
+	private static ActorSystem testActorSystem;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (testActorSystem != null) {
+			testActorSystem.shutdown();
+		}
+	}
+
+	/**
+	 * Tests responses if no leader notification has been reported or leadership
+	 * has been lost (leaderAddress = <code>null</code>).
+	 */
+	@Test
+	public void testNoJobManagerRegistered() throws Exception {
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+		lookupService.start();
+
+		//
+		// No leader registered initially => fail with UnknownJobManager
+		//
+		try {
+			JobID jobId = new JobID();
+			String name = "coffee";
+
+			Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
+
+			Await.result(locationFuture, TIMEOUT);
+			fail("Did not throw expected Exception");
+		} catch (UnknownJobManager ignored) {
+			// Expected
+		}
+
+		assertEquals("Received unexpected lookup", 0, received.size());
+
+		//
+		// Leader registration => communicate with new leader
+		//
+		UUID leaderSessionId = null;
+		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
+
+		ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
+
+		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+		// Notify the service about a leader
+		leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
+
+		JobID jobId = new JobID();
+		String name = "tea";
+
+		// Verify that the leader response is handled
+		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
+		assertEquals(expected, location);
+
+		// Verify that the correct message was sent to the leader
+		assertEquals(1, received.size());
+
+		verifyLookupMsg(received.poll(), jobId, name);
+
+		//
+		// Leader loss => fail with UnknownJobManager
+		//
+		leaderRetrievalService.notifyListener(null, null);
+
+		try {
+			Future<KvStateLocation> locationFuture = lookupService
+					.getKvStateLookupInfo(new JobID(), "coffee");
+
+			Await.result(locationFuture, TIMEOUT);
+			fail("Did not throw expected Exception");
+		} catch (UnknownJobManager ignored) {
+			// Expected
+		}
+
+		// No new messages received
+		assertEquals(0, received.size());
+	}
+
+	/**
+	 * Tests that messages are properly decorated with the leader session ID.
+	 */
+	@Test
+	public void testLeaderSessionIdChange() throws Exception {
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+		lookupService.start();
+
+		// Create test actors with random leader session IDs
+		KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
+		UUID leaderSessionId1 = UUID.randomUUID();
+		ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
+		String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
+
+		KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
+		UUID leaderSessionId2 = UUID.randomUUID();
+		ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
+		String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
+
+		JobID jobId = new JobID();
+
+		//
+		// Notify about first leader
+		//
+		leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
+
+		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
+		assertEquals(expected1, location);
+
+		assertEquals(1, received.size());
+		verifyLookupMsg(received.poll(), jobId, "rock");
+
+		//
+		// Notify about second leader
+		//
+		leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
+
+		location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
+		assertEquals(expected2, location);
+
+		assertEquals(1, received.size());
+		verifyLookupMsg(received.poll(), jobId, "roll");
+	}
+
+	/**
+	 * Tests that lookups are retried when no leader notification is available.
+	 */
+	@Test
+	public void testRetryOnUnknownJobManager() throws Exception {
+		final Queue<LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
+
+		LookupRetryStrategyFactory retryStrategy =
+				new LookupRetryStrategyFactory() {
+					@Override
+					public LookupRetryStrategy createRetryStrategy() {
+						return retryStrategies.poll();
+					}
+				};
+
+		final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				retryStrategy);
+
+		lookupService.start();
+
+		//
+		// Test call to retry
+		//
+		final AtomicBoolean hasRetried = new AtomicBoolean();
+		retryStrategies.add(
+				new LookupRetryStrategy() {
+					@Override
+					public FiniteDuration getRetryDelay() {
+						return FiniteDuration.Zero();
+					}
+
+					@Override
+					public boolean tryRetry() {
+						if (hasRetried.compareAndSet(false, true)) {
+							return true;
+						}
+						return false;
+					}
+				});
+
+		Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
+
+		Await.ready(locationFuture, TIMEOUT);
+		assertTrue("Did not retry ", hasRetried.get());
+
+		//
+		// Test leader notification after retry
+		//
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
+		ActorRef testActor = LookupResponseActor.create(received, null, expected);
+		final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+		retryStrategies.add(new LookupRetryStrategy() {
+			@Override
+			public FiniteDuration getRetryDelay() {
+				return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+			}
+
+			@Override
+			public boolean tryRetry() {
+				leaderRetrievalService.notifyListener(testActorAddress, null);
+				return true;
+			}
+		});
+
+		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
+		assertEquals(expected, location);
+	}
+
+	@Test
+	public void testUnexpectedResponseType() throws Exception {
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+		lookupService.start();
+
+		// Create test actors with random leader session IDs
+		String expected = "unexpected-response-type";
+		ActorRef testActor = LookupResponseActor.create(received, null, expected);
+		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+		leaderRetrievalService.notifyListener(testActorAddress, null);
+
+		try {
+			Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
+			fail("Did not throw expected Exception");
+		} catch (Throwable ignored) {
+			// Expected
+		}
+	}
+
+	private final static class LookupResponseActor extends FlinkUntypedActor {
+
+		/** Received lookup messages */
+		private final Queue<LookupKvStateLocation> receivedLookups;
+
+		/** Responses on KvStateMessage.LookupKvStateLocation messages */
+		private final Queue<Object> lookupResponses;
+
+		/** The leader session ID */
+		private UUID leaderSessionId;
+
+		public LookupResponseActor(
+				Queue<LookupKvStateLocation> receivedLookups,
+				UUID leaderSessionId, Object... lookupResponses) {
+
+			this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
+			this.leaderSessionId = leaderSessionId;
+			this.lookupResponses = new ArrayDeque<>();
+
+			if (lookupResponses != null) {
+				for (Object resp : lookupResponses) {
+					this.lookupResponses.add(resp);
+				}
+			}
+		}
+
+		@Override
+		public void handleMessage(Object message) throws Exception {
+			if (message instanceof LookupKvStateLocation) {
+				// Add to received lookups queue
+				receivedLookups.add((LookupKvStateLocation) message);
+
+				Object msg = lookupResponses.poll();
+				if (msg != null) {
+					if (msg instanceof Throwable) {
+						sender().tell(new Status.Failure((Throwable) msg), self());
+					} else {
+						sender().tell(new Status.Success(msg), self());
+					}
+				}
+			} else if (message instanceof UUID) {
+				this.leaderSessionId = (UUID) message;
+			} else {
+				LOG.debug("Received unhandled message: {}", message);
+			}
+		}
+
+		@Override
+		protected UUID getLeaderSessionID() {
+			return leaderSessionId;
+		}
+
+		private static ActorRef create(
+				Queue<LookupKvStateLocation> receivedLookups,
+				UUID leaderSessionId,
+				Object... lookupResponses) {
+
+			return testActorSystem.actorOf(Props.create(
+					LookupResponseActor.class,
+					receivedLookups,
+					leaderSessionId,
+					lookupResponses));
+		}
+	}
+
+	private static void verifyLookupMsg(
+			LookupKvStateLocation lookUpMsg,
+			JobID expectedJobId,
+			String expectedName) {
+
+		assertNotNull(lookUpMsg);
+		assertEquals(expectedJobId, lookUpMsg.getJobId());
+		assertEquals(expectedName, lookUpMsg.getRegistrationName());
+	}
+
+}