You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/18 10:11:53 UTC

[flink] branch master updated: [FLINK-11383][blob] Clean up blobs of failed submissions

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b96add3  [FLINK-11383][blob] Clean up blobs of failed submissions
b96add3 is described below

commit b96add35cf656894c86fbf9e17e7adeef4772a71
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 17 16:44:16 2019 +0100

    [FLINK-11383][blob] Clean up blobs of failed submissions
    
    Let the dispatcher clean up blobs of failed submissions.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  8 +++++
 .../dispatcher/DispatcherResourceCleanupTest.java  | 34 ++++++++++++++++++++-
 .../dispatcher/TestingJobManagerRunnerFactory.java | 35 +++++++++++++++++-----
 3 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1d83ba9..2330180 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -236,6 +236,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+		return internalSubmitJob(jobGraph).whenCompleteAsync((acknowledge, throwable) -> {
+			if (throwable != null) {
+				cleanUpJobData(jobGraph.getJobID(), true);
+			}
+		}, getRpcService().getExecutor());
+	}
+
+	private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
 		final JobID jobId = jobGraph.getJobID();
 
 		log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 5c4ac34..5d47e2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.TestingBlobStore;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
+import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -45,6 +46,8 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -67,6 +70,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -114,6 +119,8 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 	private File blobFile;
 
+	private AtomicReference<Supplier<Exception>> failJobMasterCreationWith;
+
 	private CompletableFuture<BlobKey> storedHABlobFuture;
 	private CompletableFuture<JobID> deleteAllHABlobsFuture;
 	private CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -162,12 +169,15 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 		// upload a blob to the blob server
 		permanentBlobKey = blobServer.putPermanent(jobId, new byte[256]);
+		jobGraph.addUserJarBlobKey(permanentBlobKey);
 		blobFile = blobServer.getStorageLocation(jobId, permanentBlobKey);
 
 		resultFuture = new CompletableFuture<>();
 
 		fatalErrorHandler = new TestingFatalErrorHandler();
 
+		failJobMasterCreationWith = new AtomicReference<>();
+
 		dispatcher = new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
@@ -179,7 +189,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture, failJobMasterCreationWith),
 			fatalErrorHandler);
 
 		dispatcher.start();
@@ -221,6 +231,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
 		terminationFuture.complete(null);
 
+		assertThatHABlobsHaveBeenRemoved();
+	}
+
+	private void assertThatHABlobsHaveBeenRemoved() throws InterruptedException, ExecutionException {
 		assertThat(cleanupJobFuture.get(), equalTo(jobId));
 
 		// verify that we also cleared the BlobStore
@@ -257,6 +271,24 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(deleteAllHABlobsFuture.isDone(), is(false));
 	}
 
+	/**
+	 * Tests that the uploaded blobs are being cleaned up in case of a job submission failure.
+	 */
+	@Test
+	public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
+		failJobMasterCreationWith.set(() -> new FlinkException("Test exception."));
+		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+		try {
+			submissionFuture.get();
+			fail("Job submission was expected to fail.");
+		} catch (ExecutionException ee) {
+			assertThat(ExceptionUtils.findThrowable(ee, JobSubmissionException.class).isPresent(), is(true));
+		}
+
+		assertThatHABlobsHaveBeenRemoved();
+	}
+
 	@Test
 	public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
 		submitJob();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 992f087..63574cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -45,11 +47,24 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
 	private final CompletableFuture<Void> terminationFuture;
+	private final AtomicReference<Supplier<Exception>> failJobMasterCreationWith;
 
-	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
+	TestingJobManagerRunnerFactory(
+			CompletableFuture<JobGraph> jobGraphFuture,
+			CompletableFuture<ArchivedExecutionGraph> resultFuture,
+			CompletableFuture<Void> terminationFuture) {
+		this(jobGraphFuture, resultFuture, terminationFuture, new AtomicReference<>());
+	}
+
+	TestingJobManagerRunnerFactory(
+			CompletableFuture<JobGraph> jobGraphFuture,
+			CompletableFuture<ArchivedExecutionGraph> resultFuture,
+			CompletableFuture<Void> terminationFuture,
+			AtomicReference<Supplier<Exception>> failJobMasterCreationWith) {
 		this.jobGraphFuture = jobGraphFuture;
 		this.resultFuture = resultFuture;
 		this.terminationFuture = terminationFuture;
+		this.failJobMasterCreationWith = failJobMasterCreationWith;
 	}
 
 	@Override
@@ -64,13 +79,19 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
-		jobGraphFuture.complete(jobGraph);
+		final Supplier<Exception> exceptionSupplier = failJobMasterCreationWith.get();
+
+		if (exceptionSupplier != null) {
+			throw exceptionSupplier.get();
+		} else {
+			jobGraphFuture.complete(jobGraph);
 
-		final JobManagerRunner mock = mock(JobManagerRunner.class);
-		when(mock.getResultFuture()).thenReturn(resultFuture);
-		when(mock.closeAsync()).thenReturn(terminationFuture);
-		when(mock.getJobGraph()).thenReturn(jobGraph);
+			final JobManagerRunner mock = mock(JobManagerRunner.class);
+			when(mock.getResultFuture()).thenReturn(resultFuture);
+			when(mock.closeAsync()).thenReturn(terminationFuture);
+			when(mock.getJobGraph()).thenReturn(jobGraph);
 
-		return mock;
+			return mock;
+		}
 	}
 }