You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/18 13:51:45 UTC
[2/2] flink git commit: [FLINK-9575] Remove job-related BLOBS only if
the job was removed sucessfully
[FLINK-9575] Remove job-related BLOBS only if the job was removed sucessfully
This closes #6322.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6b2e8c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6b2e8c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6b2e8c5
Branch: refs/heads/master
Commit: f6b2e8c5ff0304e4835d2dc8c792a0d055679603
Parents: 5735fab
Author: Wosin <bl...@gmail.com>
Authored: Wed Jul 4 10:27:54 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 15:51:23 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 5 +--
.../runtime/dispatcher/DispatcherTest.java | 46 ++++++++++++++++++++
3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5306d6f..0aa9dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -574,11 +574,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return jobManagerRunnerTerminationFuture.thenRunAsync(
() -> {
jobManagerMetricGroup.removeJob(jobId);
- blobServer.cleanupJob(jobId, cleanupHA);
if (cleanupHA) {
try {
submittedJobGraphStore.removeJobGraph(jobId);
+ blobServer.cleanupJob(jobId, cleanupHA);
} catch (Exception e) {
log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c8174f..94469a8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1734,6 +1734,8 @@ class JobManager(
// and the ZooKeeper client is closed. Not removing the job immediately allow the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
+ val result = blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
}
@@ -1759,10 +1761,7 @@ class JobManager(
case None => None
}
- // remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
- blobServer.cleanupJob(jobID, removeJobFromStateBackend)
-
jobManagerMetricGroup.removeJob(jobID)
futureOption
http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 745e9cb..ac4f1a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -72,6 +73,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
@@ -83,6 +85,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
@@ -125,6 +128,9 @@ public class DispatcherTest extends TestLogger {
@Rule
public TestName name = new TestName();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private JobGraph jobGraph;
private TestingFatalErrorHandler fatalErrorHandler;
@@ -295,6 +301,30 @@ public class DispatcherTest extends TestLogger {
}
@Test
+ public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception {
+ dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+ PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]);
+ submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
+ final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+ .setJobID(TEST_JOB_ID)
+ .setState(JobStatus.CANCELED)
+ .build();
+
+ dispatcher.completeJobExecution(executionGraph);
+ //Assert that blob was not removed, since exception was thrown while removing the job
+ assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class));
+ submittedJobGraphStore.setRemovalFailure(null);
+ dispatcher.completeJobExecution(executionGraph);
+
+ //Job removing did not throw exception now, blob should be null
+ expectedException.expect(NoSuchFileException.class);
+ blobServer.getFile(TEST_JOB_ID, key);
+ }
+
+ @Test
public void testOnAddedJobGraphRecoveryFailure() throws Exception {
final FlinkException expectedFailure = new FlinkException("Expected failure");
submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -619,10 +649,17 @@ public class DispatcherTest extends TestLogger {
@Nullable
private Exception recoveryFailure = null;
+ @Nullable
+ private Exception removalFailure = null;
+
void setRecoveryFailure(@Nullable Exception recoveryFailure) {
this.recoveryFailure = recoveryFailure;
}
+ void setRemovalFailure(@Nullable Exception removalFailure) {
+ this.removalFailure = removalFailure;
+ }
+
@Override
public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
if (recoveryFailure != null) {
@@ -631,5 +668,14 @@ public class DispatcherTest extends TestLogger {
return super.recoverJobGraph(jobId);
}
}
+
+ @Override
+ public synchronized void removeJobGraph(JobID jobId) throws Exception {
+ if (removalFailure != null) {
+ throw removalFailure;
+ } else {
+ super.removeJobGraph(jobId);
+ }
+ }
}
}