You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/18 13:51:44 UTC

[1/2] flink git commit: [FLINK-9575][tests] Simplify DispatcherTest#testBlobsAreRemovedOnlyIfJobIsRemovedProperly

Repository: flink
Updated Branches:
  refs/heads/master 5735fabff -> e984168e2


[FLINK-9575][tests] Simplify DispatcherTest#testBlobsAreRemovedOnlyIfJobIsRemovedProperly

Move DispatcherTest#testBlobsAreRemovedOnlyIfJobIsRemovedProperly into DispatcherResourceCleanupTest
and split it up into a success and failure case.

Moreover, this commit changes the logic of blob cleanup to also cleanup locally in case of a removal
failure of a job from a SubmittedJobGraphStore.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e984168e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e984168e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e984168e

Branch: refs/heads/master
Commit: e984168e2eca59c08da90bd5feeac458eaa91bed
Parents: f6b2e8c
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 13:49:09 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 15:51:23 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 11 ++-
 .../flink/runtime/jobmanager/JobManager.scala   | 31 ++++---
 .../DispatcherResourceCleanupTest.java          | 89 +++++++++++---------
 .../runtime/dispatcher/DispatcherTest.java      | 70 +--------------
 .../FaultySubmittedJobGraphStore.java           | 64 ++++++++++++++
 5 files changed, 142 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 0aa9dfc..c96acbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -575,20 +575,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			() -> {
 				jobManagerMetricGroup.removeJob(jobId);
 
+				boolean cleanupHABlobs = false;
 				if (cleanupHA) {
 					try {
 						submittedJobGraphStore.removeJobGraph(jobId);
-						blobServer.cleanupJob(jobId, cleanupHA);
+
+						// only clean up the HA blobs if we could remove the job from HA storage
+						cleanupHABlobs = true;
 					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
+						log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
 					}
 
 					try {
 						runningJobsRegistry.clearJob(jobId);
 					} catch (IOException e) {
-						log.warn("Could not properly remove job {} from the running jobs registry.", jobId);
+						log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
 					}
 				}
+
+				blobServer.cleanupJob(jobId, cleanupHABlobs);
 			},
 			getRpcService().getExecutor());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 94469a8..2a8f492 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1725,22 +1725,31 @@ class JobManager(
    */
   private def removeJob(jobID: JobID, removeJobFromStateBackend: Boolean): Option[Future[Unit]] = {
     // Don't remove the job yet...
-    val futureOption = currentJobs.get(jobID) match {
+    val futureOption = currentJobs.remove(jobID) match {
       case Some((eg, _)) =>
-        val result = if (removeJobFromStateBackend) {
-          val futureOption = Some(future {
+        val cleanUpFuture: Future[Unit] = Future {
+          val cleanupHABlobs = if (removeJobFromStateBackend) {
             try {
               // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
-              val result  = blobServer.cleanupJob(jobID, removeJobFromStateBackend)
-
+              true
             } catch {
-              case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+              case t: Throwable => {
+                log.warn(s"Could not remove submitted job graph $jobID.", t)
+                false
+              }
             }
-          }(context.dispatcher))
+          } else {
+            false
+          }
 
+          blobServer.cleanupJob(jobID, cleanupHABlobs)
+          ()
+        }(context.dispatcher)
+
+        if (removeJobFromStateBackend) {
           try {
             archive ! decorateMessage(
               ArchiveExecutionGraph(
@@ -1749,15 +1758,9 @@ class JobManager(
           } catch {
             case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
           }
-
-          futureOption
-        } else {
-          None
         }
 
-        currentJobs.remove(jobID)
-
-        result
+        Option(cleanUpFuture)
       case None => None
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index e42b14a..d09ab8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -38,23 +38,19 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -64,11 +60,12 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -93,6 +90,9 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	private static final Time timeout = Time.seconds(10L);
 
 	private static TestingRpcService rpcService;
@@ -123,11 +123,12 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 	private File blobFile;
 
-	private CompletableFuture<BlobKey> storedBlobFuture;
-	private CompletableFuture<JobID> deleteAllFuture;
+	private CompletableFuture<BlobKey> storedHABlobFuture;
+	private CompletableFuture<JobID> deleteAllHABlobsFuture;
 	private CompletableFuture<ArchivedExecutionGraph> resultFuture;
 	private CompletableFuture<JobID> cleanupJobFuture;
 	private CompletableFuture<Void> terminationFuture;
+	private FaultySubmittedJobGraphStore submittedJobGraphStore;
 
 	@BeforeClass
 	public static void setupClass() {
@@ -151,15 +152,16 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		clearedJobLatch = new OneShotLatch();
 		runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
 		highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
-		highAvailabilityServices.setSubmittedJobGraphStore(new InMemorySubmittedJobGraphStore());
+		submittedJobGraphStore = new FaultySubmittedJobGraphStore();
+		highAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
 
-		storedBlobFuture = new CompletableFuture<>();
-		deleteAllFuture = new CompletableFuture<>();
+		storedHABlobFuture = new CompletableFuture<>();
+		deleteAllHABlobsFuture = new CompletableFuture<>();
 
 		final TestingBlobStore testingBlobStore = new TestingBlobStoreBuilder()
 			.setPutFunction(
-				putArguments -> storedBlobFuture.complete(putArguments.f2))
-			.setDeleteAllFunction(deleteAllFuture::complete)
+				putArguments -> storedHABlobFuture.complete(putArguments.f2))
+			.setDeleteAllFunction(deleteAllHABlobsFuture::complete)
 			.createTestingBlobStore();
 
 		cleanupJobFuture = new CompletableFuture<>();
@@ -180,7 +182,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
 			configuration,
 			highAvailabilityServices,
-			highAvailabilityServices.getSubmittedJobGraphStore(),
 			new TestingResourceManagerGateway(),
 			blobServer,
 			new HeartbeatServices(1000L, 1000L),
@@ -199,7 +200,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(blobFile.exists(), is(true));
 
 		// verify that we stored the blob also in the BlobStore
-		assertThat(storedBlobFuture.get(), equalTo(permanentBlobKey));
+		assertThat(storedHABlobFuture.get(), equalTo(permanentBlobKey));
 	}
 
 	@After
@@ -232,7 +233,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(cleanupJobFuture.get(), equalTo(jobId));
 
 		// verify that we also cleared the BlobStore
-		assertThat(deleteAllFuture.get(), equalTo(jobId));
+		assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
 
 		assertThat(blobFile.exists(), is(false));
 	}
@@ -256,13 +257,13 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 		// verify that we did not clear the BlobStore
 		try {
-			deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
+			deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
 			fail("We should not delete the HA blobs.");
 		} catch (TimeoutException ignored) {
 			// expected
 		}
 
-		assertThat(deleteAllFuture.isDone(), is(false));
+		assertThat(deleteAllHABlobsFuture.isDone(), is(false));
 	}
 
 	@Test
@@ -279,13 +280,13 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 		// verify that we did not clear the BlobStore
 		try {
-			deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
+			deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
 			fail("We should not delete the HA blobs.");
 		} catch (TimeoutException ignored) {
 			// expected
 		}
 
-		assertThat(deleteAllFuture.isDone(), is(false));
+		assertThat(deleteAllHABlobsFuture.isDone(), is(false));
 	}
 
 	/**
@@ -413,25 +414,37 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		}
 	}
 
-	private static final class TestingDispatcher extends Dispatcher {
-		TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
-			super(
-				rpcService,
-				endpointId,
-				configuration,
-				highAvailabilityServices,
-				submittedJobGraphStore,
-				resourceManagerGateway,
-				blobServer,
-				heartbeatServices,
-				jobManagerMetricGroup,
-				metricServiceQueryPath,
-				archivedExecutionGraphStore,
-				jobManagerRunnerFactory,
-				fatalErrorHandler,
-				null,
-				VoidHistoryServerArchivist.INSTANCE);
-		}
+	@Test
+	public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception {
+		submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
+		submitJob();
+
+		ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+			.setJobID(jobId)
+			.setState(JobStatus.CANCELED)
+			.build();
+
+		resultFuture.complete(executionGraph);
+		terminationFuture.complete(null);
+
+		assertThat(cleanupJobFuture.get(), equalTo(jobId));
+		assertThat(deleteAllHABlobsFuture.isDone(), is(false));
+	}
+
+	@Test
+	public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception {
+		submitJob();
+
+		ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+			.setJobID(jobId)
+			.setState(JobStatus.CANCELED)
+			.build();
+
+		resultFuture.complete(executionGraph);
+		terminationFuture.complete(null);
+
+		assertThat(cleanupJobFuture.get(), equalTo(jobId));
+		assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
 	}
 
 	private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ac4f1a8..d405fcd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -61,7 +60,6 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -73,19 +71,16 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
@@ -128,14 +123,11 @@ public class DispatcherTest extends TestLogger {
 	@Rule
 	public TestName name = new TestName();
 
-	@Rule
-	public ExpectedException expectedException = ExpectedException.none();
-
 	private JobGraph jobGraph;
 
 	private TestingFatalErrorHandler fatalErrorHandler;
 
-	private FailableSubmittedJobGraphStore submittedJobGraphStore;
+	private FaultySubmittedJobGraphStore submittedJobGraphStore;
 
 	private TestingLeaderElectionService dispatcherLeaderElectionService;
 
@@ -175,7 +167,7 @@ public class DispatcherTest extends TestLogger {
 
 		fatalErrorHandler = new TestingFatalErrorHandler();
 		final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
-		submittedJobGraphStore = new FailableSubmittedJobGraphStore();
+		submittedJobGraphStore = new FaultySubmittedJobGraphStore();
 
 		dispatcherLeaderElectionService = new TestingLeaderElectionService();
 		jobMasterLeaderElectionService = new TestingLeaderElectionService();
@@ -301,30 +293,6 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Test
-	public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception {
-		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
-		PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]);
-		submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
-		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
-		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-
-		ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
-			.setJobID(TEST_JOB_ID)
-			.setState(JobStatus.CANCELED)
-			.build();
-
-		dispatcher.completeJobExecution(executionGraph);
-		//Assert that blob was not removed, since exception was thrown while removing the job
-		assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class));
-		submittedJobGraphStore.setRemovalFailure(null);
-		dispatcher.completeJobExecution(executionGraph);
-
-		//Job removing did not throw exception now, blob should be null
-		expectedException.expect(NoSuchFileException.class);
-		blobServer.getFile(TEST_JOB_ID, key);
-	}
-
-	@Test
 	public void testOnAddedJobGraphRecoveryFailure() throws Exception {
 		final FlinkException expectedFailure = new FlinkException("Expected failure");
 		submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -644,38 +612,4 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	private static final class FailableSubmittedJobGraphStore extends InMemorySubmittedJobGraphStore {
-
-		@Nullable
-		private Exception recoveryFailure = null;
-
-		@Nullable
-		private Exception removalFailure = null;
-
-		void setRecoveryFailure(@Nullable Exception recoveryFailure) {
-			this.recoveryFailure = recoveryFailure;
-		}
-
-		void setRemovalFailure(@Nullable Exception removalFailure) {
-			this.removalFailure = removalFailure;
-		}
-
-		@Override
-		public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
-			if (recoveryFailure != null) {
-				throw recoveryFailure;
-			} else {
-				return super.recoverJobGraph(jobId);
-			}
-		}
-
-		@Override
-		public synchronized void removeJobGraph(JobID jobId) throws Exception {
-			if (removalFailure != null) {
-				throw removalFailure;
-			} else {
-				super.removeJobGraph(jobId);
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e984168e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java
new file mode 100644
index 0000000..9238ec3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FaultySubmittedJobGraphStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link InMemorySubmittedJobGraphStore} implementation which can throw artifical errors for
+ * testing purposes.
+ */
+final class FaultySubmittedJobGraphStore extends InMemorySubmittedJobGraphStore {
+
+	@Nullable
+	private Exception recoveryFailure = null;
+
+	@Nullable
+	private Exception removalFailure = null;
+
+	void setRecoveryFailure(@Nullable Exception recoveryFailure) {
+		this.recoveryFailure = recoveryFailure;
+	}
+
+	void setRemovalFailure(@Nullable Exception removalFailure) {
+		this.removalFailure = removalFailure;
+	}
+
+	@Override
+	public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		if (recoveryFailure != null) {
+			throw recoveryFailure;
+		} else {
+			return super.recoverJobGraph(jobId);
+		}
+	}
+
+	@Override
+	public synchronized void removeJobGraph(JobID jobId) throws Exception {
+		if (removalFailure != null) {
+			throw removalFailure;
+		} else {
+			super.removeJobGraph(jobId);
+		}
+	}
+}


[2/2] flink git commit: [FLINK-9575] Remove job-related BLOBS only if the job was removed sucessfully

Posted by tr...@apache.org.
[FLINK-9575] Remove job-related BLOBS only if the job was removed sucessfully

This closes #6322.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6b2e8c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6b2e8c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6b2e8c5

Branch: refs/heads/master
Commit: f6b2e8c5ff0304e4835d2dc8c792a0d055679603
Parents: 5735fab
Author: Wosin <bl...@gmail.com>
Authored: Wed Jul 4 10:27:54 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 15:51:23 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  5 +--
 .../runtime/dispatcher/DispatcherTest.java      | 46 ++++++++++++++++++++
 3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5306d6f..0aa9dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -574,11 +574,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		return jobManagerRunnerTerminationFuture.thenRunAsync(
 			() -> {
 				jobManagerMetricGroup.removeJob(jobId);
-				blobServer.cleanupJob(jobId, cleanupHA);
 
 				if (cleanupHA) {
 					try {
 						submittedJobGraphStore.removeJobGraph(jobId);
+						blobServer.cleanupJob(jobId, cleanupHA);
 					} catch (Exception e) {
 						log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c8174f..94469a8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1734,6 +1734,8 @@ class JobManager(
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
+              val result  = blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+
             } catch {
               case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
             }
@@ -1759,10 +1761,7 @@ class JobManager(
       case None => None
     }
 
-    // remove all job-related BLOBs from local and HA store
     libraryCacheManager.unregisterJob(jobID)
-    blobServer.cleanupJob(jobID, removeJobFromStateBackend)
-
     jobManagerMetricGroup.removeJob(jobID)
 
     futureOption

http://git-wip-us.apache.org/repos/asf/flink/blob/f6b2e8c5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 745e9cb..ac4f1a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -72,6 +73,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
@@ -83,6 +85,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
@@ -125,6 +128,9 @@ public class DispatcherTest extends TestLogger {
 	@Rule
 	public TestName name = new TestName();
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	private JobGraph jobGraph;
 
 	private TestingFatalErrorHandler fatalErrorHandler;
@@ -295,6 +301,30 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Test
+	public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception {
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+		PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]);
+		submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+		ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder()
+			.setJobID(TEST_JOB_ID)
+			.setState(JobStatus.CANCELED)
+			.build();
+
+		dispatcher.completeJobExecution(executionGraph);
+		//Assert that blob was not removed, since exception was thrown while removing the job
+		assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class));
+		submittedJobGraphStore.setRemovalFailure(null);
+		dispatcher.completeJobExecution(executionGraph);
+
+		//Job removing did not throw exception now, blob should be null
+		expectedException.expect(NoSuchFileException.class);
+		blobServer.getFile(TEST_JOB_ID, key);
+	}
+
+	@Test
 	public void testOnAddedJobGraphRecoveryFailure() throws Exception {
 		final FlinkException expectedFailure = new FlinkException("Expected failure");
 		submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -619,10 +649,17 @@ public class DispatcherTest extends TestLogger {
 		@Nullable
 		private Exception recoveryFailure = null;
 
+		@Nullable
+		private Exception removalFailure = null;
+
 		void setRecoveryFailure(@Nullable Exception recoveryFailure) {
 			this.recoveryFailure = recoveryFailure;
 		}
 
+		void setRemovalFailure(@Nullable Exception removalFailure) {
+			this.removalFailure = removalFailure;
+		}
+
 		@Override
 		public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 			if (recoveryFailure != null) {
@@ -631,5 +668,14 @@ public class DispatcherTest extends TestLogger {
 				return super.recoverJobGraph(jobId);
 			}
 		}
+
+		@Override
+		public synchronized void removeJobGraph(JobID jobId) throws Exception {
+			if (removalFailure != null) {
+				throw removalFailure;
+			} else {
+				super.removeJobGraph(jobId);
+			}
+		}
 	}
 }