You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/18 13:51:45 UTC

[2/2] flink git commit: [FLINK-9575] Remove job-related BLOBS only if the job was removed sucessfully

[FLINK-9575] Remove job-related BLOBS only if the job was removed sucessfully

This closes #6322.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6b2e8c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6b2e8c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6b2e8c5

Branch: refs/heads/master
Commit: f6b2e8c5ff0304e4835d2dc8c792a0d055679603
Parents: 5735fab
Author: Wosin <bl...@gmail.com>
Authored: Wed Jul 4 10:27:54 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 15:51:23 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  5 +--
 .../runtime/dispatcher/DispatcherTest.java      | 46 ++++++++++++++++++++
 3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5306d6f..0aa9dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -574,11 +574,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		return jobManagerRunnerTerminationFuture.thenRunAsync(
 			() -> {
 				jobManagerMetricGroup.removeJob(jobId);
-				blobServer.cleanupJob(jobId, cleanupHA);
 
 				if (cleanupHA) {
 					try {
 						submittedJobGraphStore.removeJobGraph(jobId);
+						blobServer.cleanupJob(jobId, cleanupHA);
 					} catch (Exception e) {
 						log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c8174f..94469a8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1734,6 +1734,8 @@ class JobManager(
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
+              val result  = blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+
             } catch {
               case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
             }
@@ -1759,10 +1761,7 @@ class JobManager(
       case None => None
     }
 
-    // remove all job-related BLOBs from local and HA store
     libraryCacheManager.unregisterJob(jobID)
-    blobServer.cleanupJob(jobID, removeJobFromStateBackend)
-
     jobManagerMetricGroup.removeJob(jobID)
 
     futureOption

http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 745e9cb..ac4f1a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -72,6 +73,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
@@ -83,6 +85,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
@@ -125,6 +128,9 @@ public class DispatcherTest extends TestLogger {
 	@Rule
 	public TestName name = new TestName();
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	private JobGraph jobGraph;
 
 	private TestingFatalErrorHandler fatalErrorHandler;
@@ -295,6 +301,30 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Test
+	public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception {
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+		PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]);
+		submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+		ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+			.setJobID(TEST_JOB_ID)
+			.setState(JobStatus.CANCELED)
+			.build();
+
+		dispatcher.completeJobExecution(executionGraph);
+		//Assert that blob was not removed, since exception was thrown while removing the job
+		assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class));
+		submittedJobGraphStore.setRemovalFailure(null);
+		dispatcher.completeJobExecution(executionGraph);
+
+		//Job removing did not throw exception now, blob should be null
+		expectedException.expect(NoSuchFileException.class);
+		blobServer.getFile(TEST_JOB_ID, key);
+	}
+
+	@Test
 	public void testOnAddedJobGraphRecoveryFailure() throws Exception {
 		final FlinkException expectedFailure = new FlinkException("Expected failure");
 		submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -619,10 +649,17 @@ public class DispatcherTest extends TestLogger {
 		@Nullable
 		private Exception recoveryFailure = null;
 
+		@Nullable
+		private Exception removalFailure = null;
+
 		void setRecoveryFailure(@Nullable Exception recoveryFailure) {
 			this.recoveryFailure = recoveryFailure;
 		}
 
+		void setRemovalFailure(@Nullable Exception removalFailure) {
+			this.removalFailure = removalFailure;
+		}
+
 		@Override
 		public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 			if (recoveryFailure != null) {
@@ -631,5 +668,14 @@ public class DispatcherTest extends TestLogger {
 				return super.recoverJobGraph(jobId);
 			}
 		}
+
+		@Override
+		public synchronized void removeJobGraph(JobID jobId) throws Exception {
+			if (removalFailure != null) {
+				throw removalFailure;
+			} else {
+				super.removeJobGraph(jobId);
+			}
+		}
 	}
 }