You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/18 13:51:44 UTC
[1/2] flink git commit: [FLINK-9575][tests] Simplify
DispatcherTest#testBlobsAreRemovedOnlyIfJobIsRemovedProperly
Repository: flink
Updated Branches:
refs/heads/master 5735fabff -> e984168e2
[FLINK-9575][tests] Simplify DispatcherTest#testBlobsAreRemovedOnlyIfJobIsRemovedProperly
Move DispatcherTest#testBlobsAreRemovedOnlyIfJobIsRemovedProperly into DispatcherResourceCleanupTest
and split it up into a success and failure case.
Moreover, this commit changes the logic of blob cleanup to also cleanup locally in case of a removal
failure of a job from a SubmittedJobGraphStore.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e984168e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e984168e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e984168e
Branch: refs/heads/master
Commit: e984168e2eca59c08da90bd5feeac458eaa91bed
Parents: f6b2e8c
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 13:49:09 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 15:51:23 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 11 ++-
.../flink/runtime/jobmanager/JobManager.scala | 31 ++++---
.../DispatcherResourceCleanupTest.java | 89 +++++++++++---------
.../runtime/dispatcher/DispatcherTest.java | 70 +--------------
.../FaultySubmittedJobGraphStore.java | 64 ++++++++++++++
5 files changed, 142 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 0aa9dfc..c96acbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -575,20 +575,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
() -> {
jobManagerMetricGroup.removeJob(jobId);
+ boolean cleanupHABlobs = false;
if (cleanupHA) {
try {
submittedJobGraphStore.removeJobGraph(jobId);
- blobServer.cleanupJob(jobId, cleanupHA);
+
+ // only clean up the HA blobs if we could remove the job from HA storage
+ cleanupHABlobs = true;
} catch (Exception e) {
- log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
+ log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
}
try {
runningJobsRegistry.clearJob(jobId);
} catch (IOException e) {
- log.warn("Could not properly remove job {} from the running jobs registry.", jobId);
+ log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
}
}
+
+ blobServer.cleanupJob(jobId, cleanupHABlobs);
},
getRpcService().getExecutor());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 94469a8..2a8f492 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1725,22 +1725,31 @@ class JobManager(
*/
private def removeJob(jobID: JobID, removeJobFromStateBackend: Boolean): Option[Future[Unit]] = {
// Don't remove the job yet...
- val futureOption = currentJobs.get(jobID) match {
+ val futureOption = currentJobs.remove(jobID) match {
case Some((eg, _)) =>
- val result = if (removeJobFromStateBackend) {
- val futureOption = Some(future {
+ val cleanUpFuture: Future[Unit] = Future {
+ val cleanupHABlobs = if (removeJobFromStateBackend) {
try {
// ...otherwise, we can have lingering resources when there is a concurrent shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
- val result = blobServer.cleanupJob(jobID, removeJobFromStateBackend)
-
+ true
} catch {
- case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+ case t: Throwable => {
+ log.warn(s"Could not remove submitted job graph $jobID.", t)
+ false
+ }
}
- }(context.dispatcher))
+ } else {
+ false
+ }
+ blobServer.cleanupJob(jobID, cleanupHABlobs)
+ ()
+ }(context.dispatcher)
+
+ if (removeJobFromStateBackend) {
try {
archive ! decorateMessage(
ArchiveExecutionGraph(
@@ -1749,15 +1758,9 @@ class JobManager(
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}
-
- futureOption
- } else {
- None
}
- currentJobs.remove(jobID)
-
- result
+ Option(cleanUpFuture)
case None => None
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index e42b14a..d09ab8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -38,23 +38,19 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -64,11 +60,12 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@@ -93,6 +90,9 @@ public class DispatcherResourceCleanupTest extends TestLogger {
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private static final Time timeout = Time.seconds(10L);
private static TestingRpcService rpcService;
@@ -123,11 +123,12 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private File blobFile;
- private CompletableFuture<BlobKey> storedBlobFuture;
- private CompletableFuture<JobID> deleteAllFuture;
+ private CompletableFuture<BlobKey> storedHABlobFuture;
+ private CompletableFuture<JobID> deleteAllHABlobsFuture;
private CompletableFuture<ArchivedExecutionGraph> resultFuture;
private CompletableFuture<JobID> cleanupJobFuture;
private CompletableFuture<Void> terminationFuture;
+ private FaultySubmittedJobGraphStore submittedJobGraphStore;
@BeforeClass
public static void setupClass() {
@@ -151,15 +152,16 @@ public class DispatcherResourceCleanupTest extends TestLogger {
clearedJobLatch = new OneShotLatch();
runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
- highAvailabilityServices.setSubmittedJobGraphStore(new InMemorySubmittedJobGraphStore());
+ submittedJobGraphStore = new FaultySubmittedJobGraphStore();
+ highAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
- storedBlobFuture = new CompletableFuture<>();
- deleteAllFuture = new CompletableFuture<>();
+ storedHABlobFuture = new CompletableFuture<>();
+ deleteAllHABlobsFuture = new CompletableFuture<>();
final TestingBlobStore testingBlobStore = new TestingBlobStoreBuilder()
.setPutFunction(
- putArguments -> storedBlobFuture.complete(putArguments.f2))
- .setDeleteAllFunction(deleteAllFuture::complete)
+ putArguments -> storedHABlobFuture.complete(putArguments.f2))
+ .setDeleteAllFunction(deleteAllHABlobsFuture::complete)
.createTestingBlobStore();
cleanupJobFuture = new CompletableFuture<>();
@@ -180,7 +182,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
configuration,
highAvailabilityServices,
- highAvailabilityServices.getSubmittedJobGraphStore(),
new TestingResourceManagerGateway(),
blobServer,
new HeartbeatServices(1000L, 1000L),
@@ -199,7 +200,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(blobFile.exists(), is(true));
// verify that we stored the blob also in the BlobStore
- assertThat(storedBlobFuture.get(), equalTo(permanentBlobKey));
+ assertThat(storedHABlobFuture.get(), equalTo(permanentBlobKey));
}
@After
@@ -232,7 +233,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(cleanupJobFuture.get(), equalTo(jobId));
// verify that we also cleared the BlobStore
- assertThat(deleteAllFuture.get(), equalTo(jobId));
+ assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
assertThat(blobFile.exists(), is(false));
}
@@ -256,13 +257,13 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// verify that we did not clear the BlobStore
try {
- deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
+ deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
fail("We should not delete the HA blobs.");
} catch (TimeoutException ignored) {
// expected
}
- assertThat(deleteAllFuture.isDone(), is(false));
+ assertThat(deleteAllHABlobsFuture.isDone(), is(false));
}
@Test
@@ -279,13 +280,13 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// verify that we did not clear the BlobStore
try {
- deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
+ deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
fail("We should not delete the HA blobs.");
} catch (TimeoutException ignored) {
// expected
}
- assertThat(deleteAllFuture.isDone(), is(false));
+ assertThat(deleteAllHABlobsFuture.isDone(), is(false));
}
/**
@@ -413,25 +414,37 @@ public class DispatcherResourceCleanupTest extends TestLogger {
}
}
- private static final class TestingDispatcher extends Dispatcher {
- TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
- super(
- rpcService,
- endpointId,
- configuration,
- highAvailabilityServices,
- submittedJobGraphStore,
- resourceManagerGateway,
- blobServer,
- heartbeatServices,
- jobManagerMetricGroup,
- metricServiceQueryPath,
- archivedExecutionGraphStore,
- jobManagerRunnerFactory,
- fatalErrorHandler,
- null,
- VoidHistoryServerArchivist.INSTANCE);
- }
+ @Test
+ public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception {
+ submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
+ submitJob();
+
+ ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.CANCELED)
+ .build();
+
+ resultFuture.complete(executionGraph);
+ terminationFuture.complete(null);
+
+ assertThat(cleanupJobFuture.get(), equalTo(jobId));
+ assertThat(deleteAllHABlobsFuture.isDone(), is(false));
+ }
+
+ @Test
+ public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception {
+ submitJob();
+
+ ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.CANCELED)
+ .build();
+
+ resultFuture.complete(executionGraph);
+ terminationFuture.complete(null);
+
+ assertThat(cleanupJobFuture.get(), equalTo(jobId));
+ assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
}
private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ac4f1a8..d405fcd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -61,7 +60,6 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -73,19 +71,16 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
@@ -128,14 +123,11 @@ public class DispatcherTest extends TestLogger {
@Rule
public TestName name = new TestName();
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
private JobGraph jobGraph;
private TestingFatalErrorHandler fatalErrorHandler;
- private FailableSubmittedJobGraphStore submittedJobGraphStore;
+ private FaultySubmittedJobGraphStore submittedJobGraphStore;
private TestingLeaderElectionService dispatcherLeaderElectionService;
@@ -175,7 +167,7 @@ public class DispatcherTest extends TestLogger {
fatalErrorHandler = new TestingFatalErrorHandler();
final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
- submittedJobGraphStore = new FailableSubmittedJobGraphStore();
+ submittedJobGraphStore = new FaultySubmittedJobGraphStore();
dispatcherLeaderElectionService = new TestingLeaderElectionService();
jobMasterLeaderElectionService = new TestingLeaderElectionService();
@@ -301,30 +293,6 @@ public class DispatcherTest extends TestLogger {
}
@Test
- public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception {
- dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
- PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]);
- submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
- final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-
- ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
- .setJobID(TEST_JOB_ID)
- .setState(JobStatus.CANCELED)
- .build();
-
- dispatcher.completeJobExecution(executionGraph);
- //Assert that blob was not removed, since exception was thrown while removing the job
- assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class));
- submittedJobGraphStore.setRemovalFailure(null);
- dispatcher.completeJobExecution(executionGraph);
-
- //Job removing did not throw exception now, blob should be null
- expectedException.expect(NoSuchFileException.class);
- blobServer.getFile(TEST_JOB_ID, key);
- }
-
- @Test
public void testOnAddedJobGraphRecoveryFailure() throws Exception {
final FlinkException expectedFailure = new FlinkException("Expected failure");
submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -644,38 +612,4 @@ public class DispatcherTest extends TestLogger {
}
}
- private static final class FailableSubmittedJobGraphStore extends InMemorySubmittedJobGraphStore {
-
- @Nullable
- private Exception recoveryFailure = null;
-
- @Nullable
- private Exception removalFailure = null;
-
- void setRecoveryFailure(@Nullable Exception recoveryFailure) {
- this.recoveryFailure = recoveryFailure;
- }
-
- void setRemovalFailure(@Nullable Exception removalFailure) {
- this.removalFailure = removalFailure;
- }
-
- @Override
- public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
- if (recoveryFailure != null) {
- throw recoveryFailure;
- } else {
- return super.recoverJobGraph(jobId);
- }
- }
-
- @Override
- public synchronized void removeJobGraph(JobID jobId) throws Exception {
- if (removalFailure != null) {
- throw removalFailure;
- } else {
- super.removeJobGraph(jobId);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java
new file mode 100644
index 0000000..9238ec3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link InMemorySubmittedJobGraphStore} implementation which can throw artifical errors for
+ * testing purposes.
+ */
+final class FaultySubmittedJobGraphStore extends InMemorySubmittedJobGraphStore {
+
+ @Nullable
+ private Exception recoveryFailure = null;
+
+ @Nullable
+ private Exception removalFailure = null;
+
+ void setRecoveryFailure(@Nullable Exception recoveryFailure) {
+ this.recoveryFailure = recoveryFailure;
+ }
+
+ void setRemovalFailure(@Nullable Exception removalFailure) {
+ this.removalFailure = removalFailure;
+ }
+
+ @Override
+ public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+ if (recoveryFailure != null) {
+ throw recoveryFailure;
+ } else {
+ return super.recoverJobGraph(jobId);
+ }
+ }
+
+ @Override
+ public synchronized void removeJobGraph(JobID jobId) throws Exception {
+ if (removalFailure != null) {
+ throw removalFailure;
+ } else {
+ super.removeJobGraph(jobId);
+ }
+ }
+}
[2/2] flink git commit: [FLINK-9575] Remove job-related BLOBS only if
the job was removed sucessfully
Posted by tr...@apache.org.
[FLINK-9575] Remove job-related BLOBS only if the job was removed sucessfully
This closes #6322.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6b2e8c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6b2e8c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6b2e8c5
Branch: refs/heads/master
Commit: f6b2e8c5ff0304e4835d2dc8c792a0d055679603
Parents: 5735fab
Author: Wosin <bl...@gmail.com>
Authored: Wed Jul 4 10:27:54 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 15:51:23 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 5 +--
.../runtime/dispatcher/DispatcherTest.java | 46 ++++++++++++++++++++
3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5306d6f..0aa9dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -574,11 +574,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return jobManagerRunnerTerminationFuture.thenRunAsync(
() -> {
jobManagerMetricGroup.removeJob(jobId);
- blobServer.cleanupJob(jobId, cleanupHA);
if (cleanupHA) {
try {
submittedJobGraphStore.removeJobGraph(jobId);
+ blobServer.cleanupJob(jobId, cleanupHA);
} catch (Exception e) {
log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c8174f..94469a8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1734,6 +1734,8 @@ class JobManager(
// and the ZooKeeper client is closed. Not removing the job immediately allow the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
+ val result = blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
}
@@ -1759,10 +1761,7 @@ class JobManager(
case None => None
}
- // remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
- blobServer.cleanupJob(jobID, removeJobFromStateBackend)
-
jobManagerMetricGroup.removeJob(jobID)
futureOption
http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 745e9cb..ac4f1a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -72,6 +73,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
@@ -83,6 +85,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
@@ -125,6 +128,9 @@ public class DispatcherTest extends TestLogger {
@Rule
public TestName name = new TestName();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private JobGraph jobGraph;
private TestingFatalErrorHandler fatalErrorHandler;
@@ -295,6 +301,30 @@ public class DispatcherTest extends TestLogger {
}
@Test
+ public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception {
+ dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+ PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]);
+ submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
+ final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+ .setJobID(TEST_JOB_ID)
+ .setState(JobStatus.CANCELED)
+ .build();
+
+ dispatcher.completeJobExecution(executionGraph);
+ //Assert that blob was not removed, since exception was thrown while removing the job
+ assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class));
+ submittedJobGraphStore.setRemovalFailure(null);
+ dispatcher.completeJobExecution(executionGraph);
+
+ //Job removing did not throw exception now, blob should be null
+ expectedException.expect(NoSuchFileException.class);
+ blobServer.getFile(TEST_JOB_ID, key);
+ }
+
+ @Test
public void testOnAddedJobGraphRecoveryFailure() throws Exception {
final FlinkException expectedFailure = new FlinkException("Expected failure");
submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -619,10 +649,17 @@ public class DispatcherTest extends TestLogger {
@Nullable
private Exception recoveryFailure = null;
+ @Nullable
+ private Exception removalFailure = null;
+
void setRecoveryFailure(@Nullable Exception recoveryFailure) {
this.recoveryFailure = recoveryFailure;
}
+ void setRemovalFailure(@Nullable Exception removalFailure) {
+ this.removalFailure = removalFailure;
+ }
+
@Override
public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
if (recoveryFailure != null) {
@@ -631,5 +668,14 @@ public class DispatcherTest extends TestLogger {
return super.recoverJobGraph(jobId);
}
}
+
+ @Override
+ public synchronized void removeJobGraph(JobID jobId) throws Exception {
+ if (removalFailure != null) {
+ throw removalFailure;
+ } else {
+ super.removeJobGraph(jobId);
+ }
+ }
}
}