You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:51:25 UTC

[flink] 01/01: [FLINK-10011] Release JobGraph after losing leadership in JobManager

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07ab3d9bc567cf67b0d5cb7ae55a185f898cc766
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 21 00:14:32 2018 +0200

    [FLINK-10011] Release JobGraph after losing leadership in JobManager
    
    The JobManager now releases its lock on all JobGraphs it has stored in
    the SubmittedJobGraphStore if the JobManager loses leadership. This ensures
    that a different JobManager can delete these jobs after it has recovered
    them and reached a globally terminal state. This is especially important
    when using stand-by JobManagers where a former leader might still be
    connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks.
    
    This closes #6590.
---
 .../flink/runtime/jobmanager/JobManager.scala      |  39 ++--
 .../jobmanager/JobManagerHARecoveryTest.java       |   5 +
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 203 +++++++++++++++++++++
 .../testingUtils/TestingJobManagerLike.scala       |   8 +
 .../testingUtils/TestingJobManagerMessages.scala   |   3 +
 5 files changed, 241 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index af42870..2728696 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1727,27 +1727,27 @@ class JobManager(
     // Don't remove the job yet...
     val futureOption = currentJobs.get(jobID) match {
       case Some((eg, _)) =>
-        val result = if (removeJobFromStateBackend) {
-          val futureOption = Some(future {
-            try {
+        val result = Some(future {
+          try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
-            } catch {
-              case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
             }
-          }(context.dispatcher))
+          } catch {
+            case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+          }
+        }(context.dispatcher))
 
+        if (removeJobFromStateBackend) {
           try {
             archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
           } catch {
             case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
           }
-
-          futureOption
-        } else {
-          None
         }
 
         currentJobs.remove(jobID)
@@ -1772,18 +1772,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 88141d6..ea5e01f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -515,6 +515,11 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		}
 
 		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+			// no op
+		}
+
+		@Override
 		public Collection<JobID> getJobIds() throws Exception {
 			return storedJobs.keySet();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 0000000..eb58557
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		final Future<Terminated> terminationFuture = system.terminate();
+		Await.ready(terminationFuture, TIMEOUT);
+	}
+
+	/**
+	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+	 * leadership.
+	 */
+	@Test
+	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
+		highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+		highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+		final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+		otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+		ActorRef jobManagerActorRef = null;
+		try {
+			jobManagerActorRef = JobManager.startJobManagerActors(
+				configuration,
+				system,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
+				new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)),
+				Option.empty(),
+				TestingJobManager.class,
+				MemoryArchivist.class)._1();
+
+			waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
+
+			final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+			final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
+
+			final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
+
+			Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
+
+			Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+			final JobID jobId = nonEmptyJobGraph.getJobID();
+			assertThat(jobIds, contains(jobId));
+
+			// revoke the leadership
+			leaderElectionService.notLeader();
+
+			Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
+
+			final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
+				((ExtendedActorSystem) system),
+				() -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+			assertThat(recoveredJobGraph, is(notNullValue()));
+
+			otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+			jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+			assertThat(jobIds, not(contains(jobId)));
+		} finally {
+			client.close();
+			otherClient.close();
+
+			if (jobManagerActorRef != null) {
+				jobManagerActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
+	private JobGraph createNonEmptyJobGraph() {
+		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
+	private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
+		Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
+	}
+
+	enum NoOpSubmittedJobGraphListener implements SubmittedJobGraphStore.SubmittedJobGraphListener {
+		INSTANCE;
+
+		@Override
+		public void onAddedJobGraph(JobID jobId) {
+			// no op
+		}
+
+		@Override
+		public void onRemovedJobGraph(JobID jobId) {
+			// no op
+		}
+	}
+}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index cd88133..2609dc3 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -440,6 +440,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index f79c124..4decfcf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -58,6 +58,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish
 }