You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:31 UTC

[flink] 08/08: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a933d87e14488da9b7305a671d0cb2e24f43155c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 11:24:36 2018 +0200

    [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher
    
    The Dispatcher now releases all JobGraphs it has stored in the SubmittedJobGraphStore
    if it loses leadership. This ensures that the newly elected leader after recovering
    the jobs can remove them from the SubmittedJobGraphStore. Before, the problem was
    that a former leader might still be connected to ZooKeeper which keeps its ephemeral
    lock nodes alive. This could prevent the deletion of the JobGraph from ZooKeeper.
    The problem occurs in particular in multi stand-by Dispatcher scenarios.
    
    This closes #6587.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  59 +++---
 .../dispatcher/NoOpSubmittedJobGraphListener.java  |  40 ++++
 .../runtime/dispatcher/TestingDispatcher.java      |  13 ++
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 203 +++++++++++++++++++++
 4 files changed, 293 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c96acbd..c31e64c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -572,30 +572,38 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 
 		return jobManagerRunnerTerminationFuture.thenRunAsync(
-			() -> {
-				jobManagerMetricGroup.removeJob(jobId);
+			() -> cleanUpJobData(jobId, cleanupHA),
+			getRpcService().getExecutor());
+	}
 
-				boolean cleanupHABlobs = false;
-				if (cleanupHA) {
-					try {
-						submittedJobGraphStore.removeJobGraph(jobId);
+	private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
+		jobManagerMetricGroup.removeJob(jobId);
 
-						// only clean up the HA blobs if we could remove the job from HA storage
-						cleanupHABlobs = true;
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
-					}
+		boolean cleanupHABlobs = false;
+		if (cleanupHA) {
+			try {
+				submittedJobGraphStore.removeJobGraph(jobId);
 
-					try {
-						runningJobsRegistry.clearJob(jobId);
-					} catch (IOException e) {
-						log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
-					}
-				}
+				// only clean up the HA blobs if we could remove the job from HA storage
+				cleanupHABlobs = true;
+			} catch (Exception e) {
+				log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
+			}
 
-				blobServer.cleanupJob(jobId, cleanupHABlobs);
-			},
-			getRpcService().getExecutor());
+			try {
+				runningJobsRegistry.clearJob(jobId);
+			} catch (IOException e) {
+				log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
+			}
+		} else {
+			try {
+				submittedJobGraphStore.releaseJobGraph(jobId);
+			} catch (Exception e) {
+				log.warn("Could not properly release job {} from submitted job graph store.", jobId, e);
+			}
+		}
+
+		blobServer.cleanupJob(jobId, cleanupHABlobs);
 	}
 
 	/**
@@ -806,8 +814,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
-		final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
-			.getOrDefault(jobId, CompletableFuture.completedFuture(null))
+		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
 					new DispatcherException(
@@ -822,6 +829,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			getMainThreadExecutor());
 	}
 
+	protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+		if (jobManagerRunners.containsKey(jobId)) {
+			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
+		} else {
+			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
+		}
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
new file mode 100644
index 0000000..493534d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+/**
+ * No operation {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}
+ * implemetation for testing purposes.
+ */
+public enum NoOpSubmittedJobGraphListener implements SubmittedJobGraphStore.SubmittedJobGraphListener {
+	INSTANCE;
+
+	@Override
+	public void onAddedJobGraph(JobID jobId) {
+		// No op
+	}
+
+	@Override
+	public void onRemovedJobGraph(JobID jobId) {
+		// No op
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index f5091ea..5141be0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -29,8 +31,12 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
 /**
  * {@link Dispatcher} implementation used for testing purposes.
  */
@@ -72,4 +78,11 @@ class TestingDispatcher extends Dispatcher {
 		runAsync(
 			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
 	}
+
+	@VisibleForTesting
+	public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
+		return callAsyncWithoutFencing(
+			() -> getJobTerminationFuture(jobId),
+			timeout).thenCompose(Function.identity());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
new file mode 100644
index 0000000..dd03758
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test cases for the interaction between ZooKeeper HA and the {@link Dispatcher}.
+ */
+public class ZooKeeperHADispatcherTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static Configuration configuration;
+
+	private static TestingRpcService rpcService;
+
+	private static BlobServer blobServer;
+
+	@Rule
+	public TestName name = new TestName();
+
+	private TestingFatalErrorHandler testingFatalErrorHandler;
+
+	@BeforeClass
+	public static void setupClass() throws IOException {
+		configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+		rpcService = new TestingRpcService();
+		blobServer = new BlobServer(configuration, new VoidBlobStore());
+	}
+
+	@Before
+	public void setup() {
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		if (testingFatalErrorHandler != null) {
+			testingFatalErrorHandler.rethrowError();
+		}
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		if (rpcService != null) {
+			RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+			rpcService = null;
+		}
+
+		if (blobServer != null) {
+			blobServer.close();
+			blobServer = null;
+		}
+	}
+
+	/**
+	 * Tests that the {@link Dispatcher} releases a locked {@link SubmittedJobGraph} if it
+	 * lost the leadership.
+	 */
+	@Test
+	public void testSubmittedJobGraphRelease() throws Exception {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+
+		try (final TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices()) {
+			testingHighAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(
+				otherClient,
+				configuration);
+
+			otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+			testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+			final TestingDispatcher dispatcher = createDispatcher(testingHighAvailabilityServices);
+
+			dispatcher.start();
+
+			try {
+				final DispatcherId expectedLeaderId = DispatcherId.generate();
+				leaderElectionService.isLeader(expectedLeaderId.toUUID()).get();
+
+				final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+				final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT);
+				submissionFuture.get();
+
+				Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				final JobID jobId = nonEmptyJobGraph.getJobID();
+				assertThat(jobIds, Matchers.contains(jobId));
+
+				leaderElectionService.notLeader();
+
+				// wait for the job to properly terminate
+				final CompletableFuture<Void> jobTerminationFuture = dispatcher.getJobTerminationFuture(jobId, TIMEOUT);
+				jobTerminationFuture.get();
+
+				// recover the job
+				final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
+
+				assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
+
+				// check that the other submitted job graph store can remove the job graph after the original leader
+				// has lost its leadership
+				otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+				jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				assertThat(jobIds, Matchers.not(Matchers.contains(jobId)));
+			} finally {
+				RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+				client.close();
+				otherClient.close();
+			}
+		}
+	}
+
+	@Nonnull
+	private TestingDispatcher createDispatcher(TestingHighAvailabilityServices testingHighAvailabilityServices) throws Exception {
+		return new TestingDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+			configuration,
+			testingHighAvailabilityServices,
+			new TestingResourceManagerGateway(),
+			blobServer,
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			null,
+			new MemoryArchivedExecutionGraphStore(),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			testingFatalErrorHandler);
+	}
+}