You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:29 UTC
[flink] 06/08: [FLINK-10011] Release JobGraph after losing
leadership in JobManager
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c395e896e5eb69a8255c372ea656483b05c8f94d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 21 00:14:32 2018 +0200
[FLINK-10011] Release JobGraph after losing leadership in JobManager
The JobManager now releases its lock on all JobGraphs it has stored in
the SubmittedJobGraphStore if the JobManager loses leadership. This ensures
that a different JobManager can delete these jobs after it has recovered
them and reached a globally terminal state. This is especially important
when using stand-by JobManagers where a former leader might still be
connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks.
---
.../org/apache/flink/runtime/akka/ActorUtils.java | 10 ++
.../flink/runtime/jobmanager/JobManager.scala | 39 +++--
.../flink/runtime/dispatcher/DispatcherHATest.java | 9 +-
.../flink/runtime/jobmanager/JobManagerTest.java | 3 +-
.../jobmanager/ZooKeeperHAJobManagerTest.java | 180 +++++++++++++++++++++
.../testingUtils/TestingJobManagerLike.scala | 8 +
.../testingUtils/TestingJobManagerMessages.scala | 3 +
7 files changed, 231 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f9059..9a99281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
package org.apache.flink.runtime.akka;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
import akka.actor.ActorRef;
import akka.actor.Kill;
+import akka.actor.PoisonPill;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@ public class ActorUtils {
return FutureUtils.completeAll(terminationFutures);
}
+ public static void stopActor(AkkaActorGateway akkaActorGateway) {
+ stopActor(akkaActorGateway.actor());
+ }
+
+ public static void stopActor(ActorRef actorRef) {
+ actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
private ActorUtils() {}
}
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0988730..6d27dc3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1729,21 +1729,22 @@ class JobManager(
val futureOption = currentJobs.remove(jobID) match {
case Some((eg, _)) =>
val cleanUpFuture: Future[Unit] = Future {
- val cleanupHABlobs = if (removeJobFromStateBackend) {
- try {
+ val cleanupHABlobs = try {
+ if (removeJobFromStateBackend) {
// ...otherwise, we can have lingering resources when there is a concurrent shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
true
- } catch {
- case t: Throwable => {
- log.warn(s"Could not remove submitted job graph $jobID.", t)
- false
- }
+ } else {
+ submittedJobGraphs.releaseJobGraph(jobID)
+ false
+ }
+ } catch {
+ case t: Throwable => {
+ log.warn(s"Could not remove submitted job graph $jobID.", t)
+ false
}
- } else {
- false
}
blobServer.cleanupJob(jobID, cleanupHABlobs)
@@ -1778,19 +1779,23 @@ class JobManager(
*/
private def cancelAndClearEverything(cause: Throwable)
: Seq[Future[Unit]] = {
- val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
- future {
- eg.suspend(cause)
- jobManagerMetricGroup.removeJob(eg.getJobID)
+
+ val futures = currentJobs.values.flatMap(
+ egJobInfo => {
+ val executionGraph = egJobInfo._1
+ val jobInfo = egJobInfo._2
+
+ executionGraph.suspend(cause)
+
+ val jobId = executionGraph.getJobID
jobInfo.notifyNonDetachedClients(
decorateMessage(
Failure(
- new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
- }(context.dispatcher)
- }
+ new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause))))
- currentJobs.clear()
+ removeJob(jobId, false)
+ })
futures.toSeq
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 5876c5f..adf7618 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -162,9 +163,13 @@ public class DispatcherHATest extends TestLogger {
}
@Nonnull
- private JobGraph createNonEmptyJobGraph() {
+ public static JobGraph createNonEmptyJobGraph() {
final JobVertex noOpVertex = new JobVertex("NoOp vertex");
- return new JobGraph(noOpVertex);
+ noOpVertex.setInvokableClass(NoOpInvokable.class);
+ final JobGraph jobGraph = new JobGraph(noOpVertex);
+ jobGraph.setAllowQueuedScheduling(true);
+
+ return jobGraph;
}
private static class HATestingDispatcher extends TestingDispatcher {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 052349c..098f564 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -32,8 +32,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -152,7 +152,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import static org.mockito.Mockito.mock;
public class JobManagerTest extends TestLogger {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 0000000..8e5b1b9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+ @ClassRule
+ public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setup() {
+ system = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ final Future<Terminated> terminationFuture = system.terminate();
+ Await.ready(terminationFuture, TIMEOUT);
+ }
+
+ /**
+ * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+ * leadership.
+ */
+ @Test
+ public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+ try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices()) {
+
+ final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+ final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+ highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
+ highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+ highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+ final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+ final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+ otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+ ActorRef jobManagerActorRef = null;
+ try {
+ jobManagerActorRef = JobManager.startJobManagerActors(
+ configuration,
+ system,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
+ NoOpMetricRegistry.INSTANCE,
+ Option.empty(),
+ TestingJobManager.class,
+ MemoryArchivist.class)._1();
+
+ waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
+
+ final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+ final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+
+ final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
+
+ Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
+
+ Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+ final JobID jobId = nonEmptyJobGraph.getJobID();
+ assertThat(jobIds, contains(jobId));
+
+ // revoke the leadership
+ leaderElectionService.notLeader();
+
+ Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
+
+ final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
+ ((ExtendedActorSystem) system),
+ () -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+ assertThat(recoveredJobGraph, is(notNullValue()));
+
+ otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+ jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+ assertThat(jobIds, not(contains(jobId)));
+ } finally {
+ client.close();
+ otherClient.close();
+
+ if (jobManagerActorRef != null) {
+ ActorUtils.stopActor(jobManagerActorRef);
+ }
+ }
+ }
+ }
+
+ private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
+ Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
+ }
+}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 0640f39..ebe4639 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -454,6 +454,14 @@ trait TestingJobManagerLike extends FlinkActor {
val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
receiver ! Acknowledge.get()
}
+
+ case WaitForBackgroundTasksToFinish =>
+ val future = futuresToComplete match {
+ case Some(futures) => Future.sequence(futures)
+ case None => Future.successful(Seq())
+ }
+
+ future.pipeTo(sender())
}
def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index c8529a9..64af056 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -59,6 +59,8 @@ object TestingJobManagerMessages {
case object NotifyListeners
+ case object WaitForBackgroundTasksToFinish
+
case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
case class TaskManagerTerminated(taskManager: ActorRef)
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
def getClientConnected(): AnyRef = ClientConnected
def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+ def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish
}