You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:36 UTC
[02/10] flink git commit: [FLINK-3779] [runtime] Add KvStateLocation
lookup service
[FLINK-3779] [runtime] Add KvStateLocation lookup service
- Adds an Akka-based KvStateLocation lookup service to be used by the client
to look up location information.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/775a7878
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/775a7878
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/775a7878
Branch: refs/heads/master
Commit: 775a78784b212bd1dedb9bcc21b61c4c21841209
Parents: af07eed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon May 30 14:08:03 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Aug 9 16:42:05 2016 +0200
----------------------------------------------------------------------
.../query/AkkaKvStateLocationLookupService.java | 320 ++++++++++++++++
.../query/KvStateLocationLookupService.java | 49 +++
.../flink/runtime/query/UnknownJobManager.java | 33 ++
.../AkkaKvStateLocationLookupServiceTest.java | 383 +++++++++++++++++++
4 files changed, 785 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
new file mode 100644
index 0000000..ed93b2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * Akka-based {@link KvStateLocationLookupService} that retrieves the current
+ * JobManager address and uses it for lookups.
+ */
+class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
+
+ /** Future returned when no JobManager is available */
+ private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
+
+ /** Leader retrieval service to retrieve the current job manager. */
+ private final LeaderRetrievalService leaderRetrievalService;
+
+ /** The actor system used to resolve the JobManager address. */
+ private final ActorSystem actorSystem;
+
+ /** Timeout for JobManager ask-requests. */
+ private final FiniteDuration askTimeout;
+
+ /** Retry strategy factory on future failures. */
+ private final LookupRetryStrategyFactory retryStrategyFactory;
+
+ /** Current job manager future. */
+ private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
+
+ /**
+ * Creates the Akka-based {@link KvStateLocationLookupService}.
+ *
+ * @param leaderRetrievalService Leader retrieval service to use.
+ * @param actorSystem Actor system to use.
+ * @param askTimeout Timeout for JobManager ask-requests.
+ * @param retryStrategyFactory Retry strategy if no JobManager available.
+ */
+ AkkaKvStateLocationLookupService(
+ LeaderRetrievalService leaderRetrievalService,
+ ActorSystem actorSystem,
+ FiniteDuration askTimeout,
+ LookupRetryStrategyFactory retryStrategyFactory) {
+
+ this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
+ this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system");
+ this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout");
+ this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
+ }
+
+ public void start() {
+ try {
+ leaderRetrievalService.start(this);
+ } catch (Exception e) {
+ LOG.error("Failed to start leader retrieval service", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void shutDown() {
+ try {
+ leaderRetrievalService.stop();
+ } catch (Exception e) {
+ LOG.error("Failed to stop leader retrieval service", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) {
+ return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy());
+ }
+
+ /**
+ * Returns a future holding the {@link KvStateLocation} for the given job
+ * and KvState registration name.
+ *
+ * <p>If there is currently no JobManager registered with the service, the
+ * request is retried. The retry behaviour is specified by the
+ * {@link LookupRetryStrategy} of the lookup service.
+ *
+ * @param jobId JobID the KvState instance belongs to
+ * @param registrationName Name under which the KvState has been registered
+ * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures.
+ * @return Future holding the {@link KvStateLocation}
+ */
+ @SuppressWarnings("unchecked")
+ private Future<KvStateLocation> getKvStateLookupInfo(
+ final JobID jobId,
+ final String registrationName,
+ final LookupRetryStrategy lookupRetryStrategy) {
+
+ return jobManagerFuture
+ .flatMap(new Mapper<ActorGateway, Future<Object>>() {
+ @Override
+ public Future<Object> apply(ActorGateway jobManager) {
+ // Lookup the KvStateLocation
+ Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
+ return jobManager.ask(msg, askTimeout);
+ }
+ }, actorSystem.dispatcher())
+ .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
+ .recoverWith(new Recover<Future<KvStateLocation>>() {
+ @Override
+ public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
+ // If the Future fails with UnknownJobManager, retry
+ // the request. Otherwise all Futures will be failed
+ // during the start up phase, when the JobManager did
+ // not notify this service yet or leadership is lost
+ // intermittently.
+ if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
+ return Patterns.after(
+ lookupRetryStrategy.getRetryDelay(),
+ actorSystem.scheduler(),
+ actorSystem.dispatcher(),
+ new Callable<Future<KvStateLocation>>() {
+ @Override
+ public Future<KvStateLocation> call() throws Exception {
+ return getKvStateLookupInfo(
+ jobId,
+ registrationName,
+ lookupRetryStrategy);
+ }
+ });
+ } else {
+ return Futures.failed(failure);
+ }
+ }
+ }, actorSystem.dispatcher());
+ }
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID);
+ }
+
+ if (leaderAddress == null) {
+ jobManagerFuture = UNKNOWN_JOB_MANAGER;
+ } else {
+ jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
+ .map(new Mapper<ActorRef, ActorGateway>() {
+ @Override
+ public ActorGateway apply(ActorRef actorRef) {
+ return new AkkaActorGateway(actorRef, leaderSessionID);
+ }
+ }, actorSystem.dispatcher());
+ }
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ jobManagerFuture = Futures.failed(exception);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Retry strategy for failed lookups.
+ *
+ * <p>Usage:
+ * <pre>
+ * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create();
+ *
+ * if (retryStrategy.tryRetry()) {
+ * // OK to retry
+ * FiniteDuration retryDelay = retryStrategy.getRetryDelay();
+ * }
+ * </pre>
+ */
+ interface LookupRetryStrategy {
+
+ /**
+ * Returns the current retry.
+ *
+ * @return Current retry delay.
+ */
+ FiniteDuration getRetryDelay();
+
+ /**
+ * Tries another retry and returns whether it is allowed or not.
+ *
+ * @return Whether it is allowed to do another restart or not.
+ */
+ boolean tryRetry();
+
+ }
+
+ /**
+ * Factory for retry strategies.
+ */
+ interface LookupRetryStrategyFactory {
+
+ /**
+ * Creates a new retry strategy.
+ *
+ * @return The retry strategy.
+ */
+ LookupRetryStrategy createRetryStrategy();
+
+ }
+
+ /**
+ * Factory for disabled retries.
+ */
+ static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
+
+ private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();
+
+ @Override
+ public LookupRetryStrategy createRetryStrategy() {
+ return RETRY_STRATEGY;
+ }
+
+ private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
+
+ @Override
+ public FiniteDuration getRetryDelay() {
+ return FiniteDuration.Zero();
+ }
+
+ @Override
+ public boolean tryRetry() {
+ return false;
+ }
+ }
+
+ }
+
+ /**
+ * Factory for fixed delay retries.
+ */
+ static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
+
+ private final int maxRetries;
+ private final FiniteDuration retryDelay;
+
+ FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
+ this.maxRetries = maxRetries;
+ this.retryDelay = retryDelay;
+ }
+
+ @Override
+ public LookupRetryStrategy createRetryStrategy() {
+ return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay);
+ }
+
+ private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
+
+ private final Object retryLock = new Object();
+ private final int maxRetries;
+ private final FiniteDuration retryDelay;
+ private int numRetries;
+
+ public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
+ Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries");
+ this.maxRetries = maxRetries;
+ this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay");
+ }
+
+ @Override
+ public FiniteDuration getRetryDelay() {
+ synchronized (retryLock) {
+ return retryDelay;
+ }
+ }
+
+ @Override
+ public boolean tryRetry() {
+ synchronized (retryLock) {
+ if (numRetries < maxRetries) {
+ numRetries++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
new file mode 100644
index 0000000..cce432e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import scala.concurrent.Future;
+
+/**
+ * {@link KvStateLocation} lookup service.
+ */
+public interface KvStateLocationLookupService {
+
+ /**
+ * Starts the lookup service.
+ */
+ void start();
+
+ /**
+ * Shuts down the lookup service.
+ */
+ void shutDown();
+
+ /**
+ * Returns a future holding the {@link KvStateLocation} for the given job
+ * and KvState registration name.
+ *
+ * @param jobId JobID the KvState instance belongs to
+ * @param registrationName Name under which the KvState has been registered
+ * @return Future holding the {@link KvStateLocation}
+ */
+ Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
new file mode 100644
index 0000000..3549ed6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * Exception to fail Future with if no JobManager is currently registered at
+ * the {@link KvStateLocationLookupService}.
+ */
+class UnknownJobManager extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownJobManager() {
+ super("Unknown JobManager. Either the JobManager has not registered yet " +
+ "or has lost leadership.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/775a7878/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
new file mode 100644
index 0000000..e9950fb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy;
+import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
+import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
+import org.apache.flink.util.Preconditions;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AkkaKvStateLocationLookupServiceTest {
+
+ /** The default timeout. */
+ private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ /** Test actor system shared between the tests. */
+ private static ActorSystem testActorSystem;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (testActorSystem != null) {
+ testActorSystem.shutdown();
+ }
+ }
+
+ /**
+ * Tests responses if no leader notification has been reported or leadership
+ * has been lost (leaderAddress = <code>null</code>).
+ */
+ @Test
+ public void testNoJobManagerRegistered() throws Exception {
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+ lookupService.start();
+
+ //
+ // No leader registered initially => fail with UnknownJobManager
+ //
+ try {
+ JobID jobId = new JobID();
+ String name = "coffee";
+
+ Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
+
+ Await.result(locationFuture, TIMEOUT);
+ fail("Did not throw expected Exception");
+ } catch (UnknownJobManager ignored) {
+ // Expected
+ }
+
+ assertEquals("Received unexpected lookup", 0, received.size());
+
+ //
+ // Leader registration => communicate with new leader
+ //
+ UUID leaderSessionId = null;
+ KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
+
+ ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
+
+ String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+ // Notify the service about a leader
+ leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
+
+ JobID jobId = new JobID();
+ String name = "tea";
+
+ // Verify that the leader response is handled
+ KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
+ assertEquals(expected, location);
+
+ // Verify that the correct message was sent to the leader
+ assertEquals(1, received.size());
+
+ verifyLookupMsg(received.poll(), jobId, name);
+
+ //
+ // Leader loss => fail with UnknownJobManager
+ //
+ leaderRetrievalService.notifyListener(null, null);
+
+ try {
+ Future<KvStateLocation> locationFuture = lookupService
+ .getKvStateLookupInfo(new JobID(), "coffee");
+
+ Await.result(locationFuture, TIMEOUT);
+ fail("Did not throw expected Exception");
+ } catch (UnknownJobManager ignored) {
+ // Expected
+ }
+
+ // No new messages received
+ assertEquals(0, received.size());
+ }
+
+ /**
+ * Tests that messages are properly decorated with the leader session ID.
+ */
+ @Test
+ public void testLeaderSessionIdChange() throws Exception {
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+ lookupService.start();
+
+ // Create test actors with random leader session IDs
+ KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
+ UUID leaderSessionId1 = UUID.randomUUID();
+ ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
+ String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
+
+ KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
+ UUID leaderSessionId2 = UUID.randomUUID();
+ ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
+ String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
+
+ JobID jobId = new JobID();
+
+ //
+ // Notify about first leader
+ //
+ leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
+
+ KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
+ assertEquals(expected1, location);
+
+ assertEquals(1, received.size());
+ verifyLookupMsg(received.poll(), jobId, "rock");
+
+ //
+ // Notify about second leader
+ //
+ leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
+
+ location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
+ assertEquals(expected2, location);
+
+ assertEquals(1, received.size());
+ verifyLookupMsg(received.poll(), jobId, "roll");
+ }
+
+ /**
+ * Tests that lookups are retried when no leader notification is available.
+ */
+ @Test
+ public void testRetryOnUnknownJobManager() throws Exception {
+ final Queue<LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
+
+ LookupRetryStrategyFactory retryStrategy =
+ new LookupRetryStrategyFactory() {
+ @Override
+ public LookupRetryStrategy createRetryStrategy() {
+ return retryStrategies.poll();
+ }
+ };
+
+ final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ retryStrategy);
+
+ lookupService.start();
+
+ //
+ // Test call to retry
+ //
+ final AtomicBoolean hasRetried = new AtomicBoolean();
+ retryStrategies.add(
+ new LookupRetryStrategy() {
+ @Override
+ public FiniteDuration getRetryDelay() {
+ return FiniteDuration.Zero();
+ }
+
+ @Override
+ public boolean tryRetry() {
+ if (hasRetried.compareAndSet(false, true)) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
+
+ Await.ready(locationFuture, TIMEOUT);
+ assertTrue("Did not retry ", hasRetried.get());
+
+ //
+ // Test leader notification after retry
+ //
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
+ ActorRef testActor = LookupResponseActor.create(received, null, expected);
+ final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+ retryStrategies.add(new LookupRetryStrategy() {
+ @Override
+ public FiniteDuration getRetryDelay() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean tryRetry() {
+ leaderRetrievalService.notifyListener(testActorAddress, null);
+ return true;
+ }
+ });
+
+ KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
+ assertEquals(expected, location);
+ }
+
+ @Test
+ public void testUnexpectedResponseType() throws Exception {
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+ lookupService.start();
+
+ // Create test actors with random leader session IDs
+ String expected = "unexpected-response-type";
+ ActorRef testActor = LookupResponseActor.create(received, null, expected);
+ String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+ leaderRetrievalService.notifyListener(testActorAddress, null);
+
+ try {
+ Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
+ fail("Did not throw expected Exception");
+ } catch (Throwable ignored) {
+ // Expected
+ }
+ }
+
+ private final static class LookupResponseActor extends FlinkUntypedActor {
+
+ /** Received lookup messages */
+ private final Queue<LookupKvStateLocation> receivedLookups;
+
+ /** Responses on KvStateMessage.LookupKvStateLocation messages */
+ private final Queue<Object> lookupResponses;
+
+ /** The leader session ID */
+ private UUID leaderSessionId;
+
+ public LookupResponseActor(
+ Queue<LookupKvStateLocation> receivedLookups,
+ UUID leaderSessionId, Object... lookupResponses) {
+
+ this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
+ this.leaderSessionId = leaderSessionId;
+ this.lookupResponses = new ArrayDeque<>();
+
+ if (lookupResponses != null) {
+ for (Object resp : lookupResponses) {
+ this.lookupResponses.add(resp);
+ }
+ }
+ }
+
+ @Override
+ public void handleMessage(Object message) throws Exception {
+ if (message instanceof LookupKvStateLocation) {
+ // Add to received lookups queue
+ receivedLookups.add((LookupKvStateLocation) message);
+
+ Object msg = lookupResponses.poll();
+ if (msg != null) {
+ if (msg instanceof Throwable) {
+ sender().tell(new Status.Failure((Throwable) msg), self());
+ } else {
+ sender().tell(new Status.Success(msg), self());
+ }
+ }
+ } else if (message instanceof UUID) {
+ this.leaderSessionId = (UUID) message;
+ } else {
+ LOG.debug("Received unhandled message: {}", message);
+ }
+ }
+
+ @Override
+ protected UUID getLeaderSessionID() {
+ return leaderSessionId;
+ }
+
+ private static ActorRef create(
+ Queue<LookupKvStateLocation> receivedLookups,
+ UUID leaderSessionId,
+ Object... lookupResponses) {
+
+ return testActorSystem.actorOf(Props.create(
+ LookupResponseActor.class,
+ receivedLookups,
+ leaderSessionId,
+ lookupResponses));
+ }
+ }
+
+ private static void verifyLookupMsg(
+ LookupKvStateLocation lookUpMsg,
+ JobID expectedJobId,
+ String expectedName) {
+
+ assertNotNull(lookUpMsg);
+ assertEquals(expectedJobId, lookUpMsg.getJobId());
+ assertEquals(expectedName, lookUpMsg.getRegistrationName());
+ }
+
+}