You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/18 10:11:53 UTC
[flink] branch master updated: [FLINK-11383][blob] Clean up blobs
of failed submissions
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b96add3 [FLINK-11383][blob] Clean up blobs of failed submissions
b96add3 is described below
commit b96add35cf656894c86fbf9e17e7adeef4772a71
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 17 16:44:16 2019 +0100
[FLINK-11383][blob] Clean up blobs of failed submissions
Let the dispatcher clean up blobs of failed submissions.
---
.../flink/runtime/dispatcher/Dispatcher.java | 8 +++++
.../dispatcher/DispatcherResourceCleanupTest.java | 34 ++++++++++++++++++++-
.../dispatcher/TestingJobManagerRunnerFactory.java | 35 +++++++++++++++++-----
3 files changed, 69 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1d83ba9..2330180 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -236,6 +236,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+ return internalSubmitJob(jobGraph).whenCompleteAsync((acknowledge, throwable) -> {
+ if (throwable != null) {
+ cleanUpJobData(jobGraph.getJobID(), true);
+ }
+ }, getRpcService().getExecutor());
+ }
+
+ private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
final JobID jobId = jobGraph.getJobID();
log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 5c4ac34..5d47e2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
+import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -45,6 +46,8 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -67,6 +70,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@@ -114,6 +119,8 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private File blobFile;
+ private AtomicReference<Supplier<Exception>> failJobMasterCreationWith;
+
private CompletableFuture<BlobKey> storedHABlobFuture;
private CompletableFuture<JobID> deleteAllHABlobsFuture;
private CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -162,12 +169,15 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// upload a blob to the blob server
permanentBlobKey = blobServer.putPermanent(jobId, new byte[256]);
+ jobGraph.addUserJarBlobKey(permanentBlobKey);
blobFile = blobServer.getStorageLocation(jobId, permanentBlobKey);
resultFuture = new CompletableFuture<>();
fatalErrorHandler = new TestingFatalErrorHandler();
+ failJobMasterCreationWith = new AtomicReference<>();
+
dispatcher = new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
@@ -179,7 +189,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture),
+ new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture, failJobMasterCreationWith),
fatalErrorHandler);
dispatcher.start();
@@ -221,6 +231,10 @@ public class DispatcherResourceCleanupTest extends TestLogger {
resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
terminationFuture.complete(null);
+ assertThatHABlobsHaveBeenRemoved();
+ }
+
+ private void assertThatHABlobsHaveBeenRemoved() throws InterruptedException, ExecutionException {
assertThat(cleanupJobFuture.get(), equalTo(jobId));
// verify that we also cleared the BlobStore
@@ -257,6 +271,24 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(deleteAllHABlobsFuture.isDone(), is(false));
}
+ /**
+ * Tests that the uploaded blobs are being cleaned up in case of a job submission failure.
+ */
+ @Test
+ public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
+ failJobMasterCreationWith.set(() -> new FlinkException("Test exception."));
+ final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+ try {
+ submissionFuture.get();
+ fail("Job submission was expected to fail.");
+ } catch (ExecutionException ee) {
+ assertThat(ExceptionUtils.findThrowable(ee, JobSubmissionException.class).isPresent(), is(true));
+ }
+
+ assertThatHABlobsHaveBeenRemoved();
+ }
+
@Test
public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
submitJob();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 992f087..63574cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -45,11 +47,24 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
private final CompletableFuture<JobGraph> jobGraphFuture;
private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
private final CompletableFuture<Void> terminationFuture;
+ private final AtomicReference<Supplier<Exception>> failJobMasterCreationWith;
- TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
+ TestingJobManagerRunnerFactory(
+ CompletableFuture<JobGraph> jobGraphFuture,
+ CompletableFuture<ArchivedExecutionGraph> resultFuture,
+ CompletableFuture<Void> terminationFuture) {
+ this(jobGraphFuture, resultFuture, terminationFuture, new AtomicReference<>());
+ }
+
+ TestingJobManagerRunnerFactory(
+ CompletableFuture<JobGraph> jobGraphFuture,
+ CompletableFuture<ArchivedExecutionGraph> resultFuture,
+ CompletableFuture<Void> terminationFuture,
+ AtomicReference<Supplier<Exception>> failJobMasterCreationWith) {
this.jobGraphFuture = jobGraphFuture;
this.resultFuture = resultFuture;
this.terminationFuture = terminationFuture;
+ this.failJobMasterCreationWith = failJobMasterCreationWith;
}
@Override
@@ -64,13 +79,19 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
- jobGraphFuture.complete(jobGraph);
+ final Supplier<Exception> exceptionSupplier = failJobMasterCreationWith.get();
+
+ if (exceptionSupplier != null) {
+ throw exceptionSupplier.get();
+ } else {
+ jobGraphFuture.complete(jobGraph);
- final JobManagerRunner mock = mock(JobManagerRunner.class);
- when(mock.getResultFuture()).thenReturn(resultFuture);
- when(mock.closeAsync()).thenReturn(terminationFuture);
- when(mock.getJobGraph()).thenReturn(jobGraph);
+ final JobManagerRunner mock = mock(JobManagerRunner.class);
+ when(mock.getResultFuture()).thenReturn(resultFuture);
+ when(mock.closeAsync()).thenReturn(terminationFuture);
+ when(mock.getJobGraph()).thenReturn(jobGraph);
- return mock;
+ return mock;
+ }
}
}