You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/09 16:46:51 UTC
[3/4] flink git commit: [FLINK-5193] [jm] Harden job recovery in case
of recovery failures
[FLINK-5193] [jm] Harden job recovery in case of recovery failures
When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
This PR changes this behaviour to make the recovery of jobs independent so that a single
failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
for failures originating in the ZooKeeperSubmittedJobGraphStore.
Add test case
Fix failing JobManagerHACheckpointRecoveryITCase
This closes #2909.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/add3765d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/add3765d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/add3765d
Branch: refs/heads/master
Commit: add3765d1626a04fb98b8f36cb725eb32806d8b6
Parents: ea70807
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 29 17:31:08 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:42:58 2016 +0100
----------------------------------------------------------------------
.../StandaloneSubmittedJobGraphStore.java | 11 +-
.../jobmanager/SubmittedJobGraphStore.java | 19 +--
.../ZooKeeperSubmittedJobGraphStore.java | 109 +++++++------
.../zookeeper/ZooKeeperStateHandleStore.java | 44 ++++-
.../flink/runtime/jobmanager/JobManager.scala | 38 ++---
.../jobmanager/JobManagerHARecoveryTest.java | 161 ++++++++++++++++++-
.../StandaloneSubmittedJobGraphStoreTest.java | 11 +-
.../ZooKeeperSubmittedJobGraphsStoreITCase.java | 29 ++--
.../JobManagerHACheckpointRecoveryITCase.java | 4 +-
9 files changed, 307 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index 3041cde..d1ca1a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
+import java.util.Collection;
import java.util.Collections;
-import java.util.List;
/**
* {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
@@ -54,12 +53,12 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
}
@Override
- public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
- return Option.empty();
+ public Collection<JobID> getJobIds() throws Exception {
+ return Collections.emptyList();
}
@Override
- public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
- return Collections.emptyList();
+ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..55c2e79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,10 +19,8 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
-import java.util.List;
+import java.util.Collection;
/**
* {@link SubmittedJobGraph} instances for recovery.
@@ -40,16 +38,11 @@ public interface SubmittedJobGraphStore {
void stop() throws Exception;
/**
- * Returns a list of all submitted {@link JobGraph} instances.
- */
- List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
-
- /**
* Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
*
* <p>An Exception is thrown, if no job graph with the given ID exists.
*/
- Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+ SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;
/**
* Adds the {@link SubmittedJobGraph} instance.
@@ -64,6 +57,14 @@ public interface SubmittedJobGraphStore {
void removeJobGraph(JobID jobId) throws Exception;
/**
+ * Get all job ids of submitted job graphs to the submitted job graph store.
+ *
+ * @return Collection of submitted job ids
+ * @throws Exception if the operation fails
+ */
+ Collection<JobID> getJobIds() throws Exception;
+
+ /**
* A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
* multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index c1dc656..aaafa76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -24,18 +24,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -156,47 +153,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
@Override
- public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
- synchronized (cacheLock) {
- verifyIsRunning();
-
- LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
- List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
-
- while (true) {
- try {
- submitted = jobGraphsInZooKeeper.getAll();
- break;
- }
- catch (ConcurrentModificationException e) {
- LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
- }
- }
-
- LOG.info("Found {} job graphs.", submitted.size());
-
- if (submitted.size() != 0) {
- List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
-
- for (Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
- SubmittedJobGraph jobGraph = jobStateHandle.f0.retrieveState();
- addedJobGraphs.add(jobGraph.getJobId());
-
- jobGraphs.add(jobGraph);
- }
-
- LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
- return jobGraphs;
- }
- else {
- LOG.info("No job graph to recover.");
- return Collections.emptyList();
- }
- }
- }
-
- @Override
- public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
@@ -205,17 +162,29 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
synchronized (cacheLock) {
verifyIsRunning();
- try {
- SubmittedJobGraph jobGraph = jobGraphsInZooKeeper.get(path).retrieveState();
- addedJobGraphs.add(jobGraph.getJobId());
+ RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
- LOG.info("Recovered {}.", jobGraph);
-
- return Option.apply(jobGraph);
+ try {
+ jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path);
+ } catch (KeeperException.NoNodeException ignored) {
+ return null;
+ } catch (Exception e) {
+ throw new Exception("Could not retrieve the submitted job graph state handle " +
+ "for " + path + "from the submitted job graph store.", e);
}
- catch (KeeperException.NoNodeException ignored) {
- return Option.empty();
+ SubmittedJobGraph jobGraph;
+
+ try {
+ jobGraph = jobGraphRetrievableStateHandle.retrieveState();
+ } catch (Exception e) {
+ throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
}
+
+ addedJobGraphs.add(jobGraph.getJobId());
+
+ LOG.info("Recovered {}.", jobGraph);
+
+ return jobGraph;
}
}
@@ -283,6 +252,31 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
+ @Override
+ public Collection<JobID> getJobIds() throws Exception {
+ Collection<String> paths;
+
+ LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", zooKeeperFullBasePath);
+
+ try {
+ paths = jobGraphsInZooKeeper.getAllPaths();
+ } catch (Exception e) {
+ throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
+ }
+
+ List<JobID> jobIds = new ArrayList<>(paths.size());
+
+ for (String path : paths) {
+ try {
+ jobIds.add(jobIdfromPath(path));
+ } catch (Exception exception) {
+ LOG.warn("Could not parse job id from {}. This indicates a malformed path.", path, exception);
+ }
+ }
+
+ return jobIds;
+ }
+
/**
* Monitors ZooKeeper for changes.
*
@@ -405,4 +399,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
return String.format("/%s", jobId);
}
+ /**
+ * Returns the JobID from the given path in ZooKeeper.
+ *
+ * @param path in ZooKeeper
+ * @return JobID associated with the given path
+ */
+ public static JobID jobIdfromPath(final String path) {
+ return JobID.fromHexString(path);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 14d9f6f..dd32efb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,8 +30,11 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
@@ -225,8 +228,45 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
- byte[] data = client.getData().forPath(pathInZooKeeper);
- return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
+ byte[] data;
+
+ try {
+ data = client.getData().forPath(pathInZooKeeper);
+ } catch (Exception e) {
+ throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper +
+ " from ZooKeeper.", e);
+ }
+
+ try {
+ return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new Exception("Failed to deserialize state handle from ZooKeeper data from " +
+ pathInZooKeeper + '.', e);
+ }
+ }
+
+ /**
+ * Return a list of all valid paths for state handles.
+ *
+ * @return List of valid state handle paths in ZooKeeper
+ * @throws Exception if a ZooKeeper operation fails
+ */
+ public Collection<String> getAllPaths() throws Exception {
+ final String path = "/";
+
+ while(true) {
+ Stat stat = client.checkExists().forPath(path);
+
+ if (stat == null) {
+ return Collections.emptyList();
+ } else {
+ try {
+ return client.getChildren().forPath(path);
+ } catch (KeeperException.NoNodeException ignored) {
+ // Concurrent deletion, retry
+ }
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 982efe8..1dfd3db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -495,6 +495,8 @@ class JobManager(
case RecoverSubmittedJob(submittedJobGraph) =>
if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+ log.info(s"Submitting recovered job ${submittedJobGraph.getJobId}.")
+
submitJob(
submittedJobGraph.getJobGraph(),
submittedJobGraph.getJobInfo(),
@@ -516,7 +518,7 @@ class JobManager(
log.info(s"Attempting to recover job $jobId.")
val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
- submittedJobGraphOption match {
+ Option(submittedJobGraphOption) match {
case Some(submittedJobGraph) =>
if (!leaderElectionService.hasLeadership()) {
// we've lost leadership. mission: abort.
@@ -529,37 +531,31 @@ class JobManager(
}
}
} catch {
- case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
+ case t: Throwable => log.warn(s"Failed to recover job $jobId.", t)
}
}(context.dispatcher)
case RecoverAllJobs =>
future {
- try {
- // The ActorRef, which is part of the submitted job graph can only be
- // de-serialized in the scope of an actor system.
- akka.serialization.JavaSerializer.currentSystem.withValue(
- context.system.asInstanceOf[ExtendedActorSystem]) {
-
- log.info(s"Attempting to recover all jobs.")
+ log.info("Attempting to recover all jobs.")
- val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+ try {
+ val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
- if (!leaderElectionService.hasLeadership()) {
- // we've lost leadership. mission: abort.
- log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
- s"jobs.")
- } else {
- log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
+ if (jobIdsToRecover.isEmpty) {
+ log.info("There are no jobs to recover.")
+ } else {
+ log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job " +
+ s"recovery.")
- jobGraphs.foreach{
- submittedJobGraph =>
- self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
- }
+ jobIdsToRecover foreach {
+ jobId => self ! decorateMessage(RecoverJob(jobId))
}
}
} catch {
- case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
+ case e: Exception =>
+ log.warn("Failed to recover job ids from submitted job graph store. Aborting " +
+ "recovery.", e)
}
}(context.dispatcher)
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 69aac31..36412f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.japi.pf.FI;
+import akka.japi.pf.ReceiveBuilder;
+import akka.pattern.Patterns;
+import akka.testkit.CallingThreadDispatcher;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
@@ -31,6 +36,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -39,8 +45,10 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceManager;
@@ -53,10 +61,12 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
@@ -77,25 +87,35 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Int;
import scala.Option;
+import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
+import scala.runtime.BoxedUnit;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class JobManagerHARecoveryTest {
@@ -296,6 +316,131 @@ public class JobManagerHARecoveryTest {
}
/**
+ * Tests that a failing job recovery won't cause other job recoveries to fail.
+ */
+ @Test
+ public void testFailingJobRecovery() throws Exception {
+ final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+ final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
+ Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
+ final Configuration flinkConfiguration = new Configuration();
+ UUID leaderSessionID = UUID.randomUUID();
+ ActorRef jobManager = null;
+ JobID jobId1 = new JobID();
+ JobID jobId2 = new JobID();
+
+ // set HA mode to zookeeper so that we try to recover jobs
+ flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+ try {
+ final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+
+ SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class);
+ when(submittedJobGraph.getJobId()).thenReturn(jobId2);
+
+ when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
+
+ // fail the first job recovery
+ when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception"));
+ // succeed the second job recovery
+ when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
+
+ final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
+
+ final Collection<JobID> recoveredJobs = new ArrayList<>(2);
+
+ Props jobManagerProps = Props.create(
+ TestingFailingHAJobManager.class,
+ flinkConfiguration,
+ Executors.directExecutor(),
+ Executors.directExecutor(),
+ mock(InstanceManager.class),
+ mock(Scheduler.class),
+ new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+ ActorRef.noSender(),
+ new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+ timeout,
+ myLeaderElectionService,
+ submittedJobGraphStore,
+ mock(CheckpointRecoveryFactory.class),
+ jobRecoveryTimeout,
+ Option.<MetricRegistry>apply(null),
+ recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
+
+ jobManager = system.actorOf(jobManagerProps, "jobmanager");
+
+ Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
+
+ Await.ready(started, deadline.timeLeft());
+
+ // make the job manager the leader --> this triggers the recovery of all jobs
+ myLeaderElectionService.isLeader(leaderSessionID);
+
+ // check that we have successfully recovered the second job
+ assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+ } finally {
+ TestingUtils.stopActor(jobManager);
+ }
+ }
+
+ static class TestingFailingHAJobManager extends JobManager {
+
+ private final Collection<JobID> recoveredJobs;
+
+ public TestingFailingHAJobManager(
+ Configuration flinkConfiguration,
+ Executor futureExecutor,
+ Executor ioExecutor,
+ InstanceManager instanceManager,
+ Scheduler scheduler,
+ BlobLibraryCacheManager libraryCacheManager,
+ ActorRef archive,
+ RestartStrategyFactory restartStrategyFactory,
+ FiniteDuration timeout,
+ LeaderElectionService leaderElectionService,
+ SubmittedJobGraphStore submittedJobGraphs,
+ CheckpointRecoveryFactory checkpointRecoveryFactory,
+ FiniteDuration jobRecoveryTimeout,
+ Option<MetricRegistry> metricsRegistry,
+ Collection<JobID> recoveredJobs) {
+ super(
+ flinkConfiguration,
+ futureExecutor,
+ ioExecutor,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory,
+ jobRecoveryTimeout,
+ metricsRegistry);
+
+ this.recoveredJobs = recoveredJobs;
+ }
+
+ @Override
+ public PartialFunction<Object, BoxedUnit> handleMessage() {
+ return ReceiveBuilder.match(
+ JobManagerMessages.RecoverSubmittedJob.class,
+ new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
+ @Override
+ public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
+ recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
+ }
+ }).matchAny(new FI.UnitApply<Object>() {
+ @Override
+ public void apply(Object o) throws Exception {
+ TestingFailingHAJobManager.super.handleMessage().apply(o);
+ }
+ }).build();
+ }
+ }
+
+ /**
* A checkpoint store, which supports shutdown and suspend. You can use this to test HA
* as long as the factory always returns the same store instance.
*/
@@ -391,16 +536,11 @@ public class JobManagerHARecoveryTest {
}
@Override
- public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
- return new ArrayList<>(storedJobs.values());
- }
-
- @Override
- public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
if (storedJobs.containsKey(jobId)) {
- return Option.apply(storedJobs.get(jobId));
+ return storedJobs.get(jobId);
} else {
- return Option.apply(null);
+ return null;
}
}
@@ -414,6 +554,11 @@ public class JobManagerHARecoveryTest {
storedJobs.remove(jobId);
}
+ @Override
+ public Collection<JobID> getJobIds() throws Exception {
+ return storedJobs.keySet();
+ }
+
boolean contains(JobID jobId) {
return storedJobs.containsKey(jobId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 8ebb7f8..079a10e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,14 +19,13 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
public class StandaloneSubmittedJobGraphStoreTest {
@@ -41,14 +40,14 @@ public class StandaloneSubmittedJobGraphStoreTest {
new JobGraph("testNoOps"),
new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
- assertEquals(0, jobGraphs.recoverJobGraphs().size());
+ assertEquals(0, jobGraphs.getJobIds().size());
jobGraphs.putJobGraph(jobGraph);
- assertEquals(0, jobGraphs.recoverJobGraphs().size());
+ assertEquals(0, jobGraphs.getJobIds().size());
jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
- assertEquals(0, jobGraphs.recoverJobGraphs().size());
+ assertEquals(0, jobGraphs.getJobIds().size());
- assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+ assertNull(jobGraphs.recoverJobGraph(new JobID()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index d156f02..9454d90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -40,8 +40,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -101,32 +101,36 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
// Empty state
- assertEquals(0, jobGraphs.recoverJobGraphs().size());
+ assertEquals(0, jobGraphs.getJobIds().size());
// Add initial
jobGraphs.putJobGraph(jobGraph);
// Verify initial job graph
- List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
- assertEquals(1, actual.size());
+ Collection<JobID> jobIds = jobGraphs.getJobIds();
+ assertEquals(1, jobIds.size());
- verifyJobGraphs(jobGraph, actual.get(0));
+ JobID jobId = jobIds.iterator().next();
+
+ verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
// Update (same ID)
jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1);
jobGraphs.putJobGraph(jobGraph);
// Verify updated
- actual = jobGraphs.recoverJobGraphs();
- assertEquals(1, actual.size());
+ jobIds = jobGraphs.getJobIds();
+ assertEquals(1, jobIds.size());
+
+ jobId = jobIds.iterator().next();
- verifyJobGraphs(jobGraph, actual.get(0));
+ verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
// Remove
jobGraphs.removeJobGraph(jobGraph.getJobId());
// Empty state
- assertEquals(0, jobGraphs.recoverJobGraphs().size());
+ assertEquals(0, jobGraphs.getJobIds().size());
// Nothing should have been notified
verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class));
@@ -162,11 +166,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
jobGraphs.putJobGraph(jobGraph);
}
- List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+ Collection<JobID> actual = jobGraphs.getJobIds();
assertEquals(expected.size(), actual.size());
- for (SubmittedJobGraph jobGraph : actual) {
+ for (JobID jobId : actual) {
+ SubmittedJobGraph jobGraph = jobGraphs.recoverJobGraph(jobId);
assertTrue(expected.containsKey(jobGraph.getJobId()));
verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
@@ -175,7 +180,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
}
// Empty state
- assertEquals(0, jobGraphs.recoverJobGraphs().size());
+ assertEquals(0, jobGraphs.getJobIds().size());
// Nothing should have been notified
verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 49eaeb7..3f08b5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -370,7 +370,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
nonLeadingJobManagerProcess = jobManagerProcess[0];
}
- // BLocking JobGraph
+ // Blocking JobGraph
JobVertex blockingVertex = new JobVertex("Blocking vertex");
blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
JobGraph jobGraph = new JobGraph(blockingVertex);
@@ -399,7 +399,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
String output = nonLeadingJobManagerProcess.getProcessOutput();
if (output != null) {
- if (output.contains("Fatal error: Failed to recover jobs") &&
+ if (output.contains("Failed to recover job") &&
output.contains("java.io.FileNotFoundException")) {
success = true;