You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:51:24 UTC

[flink] branch release-1.4 updated (7b34a03 -> 07ab3d9)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7b34a03  [FLINK-10172] [table] Fix Table.orderBy(String) by adding asc & desc to ExpressionParser.
     add 9ad1e4a  [hotfix] Fix checkstyle violations in ZooKeeperStateHandleStore
     add 04cfe3d  [hotfix] Fix checkstyle violations in ZooKeeperCompletedCheckpointStore
     add b91ae88  [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
     add fe8ddc5  [hotfix] Fix checkstyle violations in ZooKeeperUtils
     add d2a828c  [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph
     new 07ab3d9  [FLINK-10011] Release JobGraph after losing leadership in JobManager

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../services/ZooKeeperMesosServices.java           |   3 +-
 .../ZooKeeperCompletedCheckpointStore.java         |  95 +++----
 .../zookeeper/ZooKeeperHaServices.java             |   2 +-
 .../StandaloneSubmittedJobGraphStore.java          |  13 +-
 .../runtime/jobmanager/SubmittedJobGraphStore.java |  12 +
 .../ZooKeeperSubmittedJobGraphStore.java           |  31 ++-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  56 ++--
 .../zookeeper/ZooKeeperStateHandleStore.java       | 172 ++-----------
 .../runtime/zookeeper/ZooKeeperUtilityFactory.java |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala      |  39 +--
 .../checkpoint/CompletedCheckpointStoreTest.java   |   9 +-
 ...KeeperCompletedCheckpointStoreMockitoTest.java} |  19 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 286 +++++++--------------
 .../jobmanager/JobManagerHARecoveryTest.java       |   5 +
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 203 +++++++++++++++
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java    |  14 +-
 .../flink/runtime/zookeeper/ZooKeeperResource.java |  72 ++++++
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 115 +++------
 .../testingUtils/TestingJobManagerLike.scala       |   8 +
 .../testingUtils/TestingJobManagerMessages.scala   |   3 +
 20 files changed, 599 insertions(+), 572 deletions(-)
 copy flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/{ZooKeeperCompletedCheckpointStoreTest.java => ZooKeeperCompletedCheckpointStoreMockitoTest.java} (95%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java


[flink] 01/01: [FLINK-10011] Release JobGraph after losing leadership in JobManager

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07ab3d9bc567cf67b0d5cb7ae55a185f898cc766
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 21 00:14:32 2018 +0200

    [FLINK-10011] Release JobGraph after losing leadership in JobManager
    
    The JobManager now releases its lock on all JobGraphs it has stored in
    the SubmittedJobGraphStore if the JobManager loses leadership. This ensures
    that a different JobManager can delete these jobs after it has recovered
    them and reached a globally terminal state. This is especially important
    when using stand-by JobManagers where a former leader might still be
    connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks.
    
    This closes #6590.
---
 .../flink/runtime/jobmanager/JobManager.scala      |  39 ++--
 .../jobmanager/JobManagerHARecoveryTest.java       |   5 +
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 203 +++++++++++++++++++++
 .../testingUtils/TestingJobManagerLike.scala       |   8 +
 .../testingUtils/TestingJobManagerMessages.scala   |   3 +
 5 files changed, 241 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index af42870..2728696 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1727,27 +1727,27 @@ class JobManager(
     // Don't remove the job yet...
     val futureOption = currentJobs.get(jobID) match {
       case Some((eg, _)) =>
-        val result = if (removeJobFromStateBackend) {
-          val futureOption = Some(future {
-            try {
+        val result = Some(future {
+          try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
-            } catch {
-              case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
             }
-          }(context.dispatcher))
+          } catch {
+            case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+          }
+        }(context.dispatcher))
 
+        if (removeJobFromStateBackend) {
           try {
             archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
           } catch {
             case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
           }
-
-          futureOption
-        } else {
-          None
         }
 
         currentJobs.remove(jobID)
@@ -1772,18 +1772,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 88141d6..ea5e01f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -515,6 +515,11 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		}
 
 		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+			// no op
+		}
+
+		@Override
 		public Collection<JobID> getJobIds() throws Exception {
 			return storedJobs.keySet();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 0000000..eb58557
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		final Future<Terminated> terminationFuture = system.terminate();
+		Await.ready(terminationFuture, TIMEOUT);
+	}
+
+	/**
+	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+	 * leadership.
+	 */
+	@Test
+	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
+		highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+		highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+		final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+		otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+		ActorRef jobManagerActorRef = null;
+		try {
+			jobManagerActorRef = JobManager.startJobManagerActors(
+				configuration,
+				system,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
+				new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)),
+				Option.empty(),
+				TestingJobManager.class,
+				MemoryArchivist.class)._1();
+
+			waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
+
+			final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+			final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
+
+			final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
+
+			Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
+
+			Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+			final JobID jobId = nonEmptyJobGraph.getJobID();
+			assertThat(jobIds, contains(jobId));
+
+			// revoke the leadership
+			leaderElectionService.notLeader();
+
+			Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
+
+			final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
+				((ExtendedActorSystem) system),
+				() -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+			assertThat(recoveredJobGraph, is(notNullValue()));
+
+			otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+			jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+			assertThat(jobIds, not(contains(jobId)));
+		} finally {
+			client.close();
+			otherClient.close();
+
+			if (jobManagerActorRef != null) {
+				jobManagerActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
+	private JobGraph createNonEmptyJobGraph() {
+		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
+	private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
+		Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
+	}
+
+	enum NoOpSubmittedJobGraphListener implements SubmittedJobGraphStore.SubmittedJobGraphListener {
+		INSTANCE;
+
+		@Override
+		public void onAddedJobGraph(JobID jobId) {
+			// no op
+		}
+
+		@Override
+		public void onRemovedJobGraph(JobID jobId) {
+			// no op
+		}
+	}
+}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index cd88133..2609dc3 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -440,6 +440,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index f79c124..4decfcf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -58,6 +58,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish
 }