You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/07/08 19:11:32 UTC

[flink] branch release-1.10 updated: [FLINK-18097][history] Delete all job-related files on expiration

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new d64a5a0  [FLINK-18097][history] Delete all job-related files on expiration
d64a5a0 is described below

commit d64a5a0c2566416158c67719dc08d7623e14fad8
Author: Milan Nikl <mi...@firma.seznam.cz>
AuthorDate: Tue Jun 9 17:09:32 2020 +0200

    [FLINK-18097][history] Delete all job-related files on expiration
---
 .../history/HistoryServerArchiveFetcher.java       | 45 +++++++++++----------
 .../webmonitor/history/HistoryServerTest.java      | 46 ++++++++++++++++++++--
 2 files changed, 67 insertions(+), 24 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 5a41173..9b35c8e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -263,20 +263,7 @@ class HistoryServerArchiveFetcher {
 								LOG.info("Processing archive {} finished.", jobArchivePath);
 							} catch (IOException e) {
 								LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
-								// Make sure we do not include this job in the overview
-								try {
-									Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
-								} catch (IOException ioe) {
-									LOG.debug("Could not delete file from overview directory.", ioe);
-								}
-
-								// Clean up job files we may have created
-								File jobDirectory = new File(webJobDir, jobID);
-								try {
-									FileUtils.deleteDirectory(jobDirectory);
-								} catch (IOException ioe) {
-									LOG.debug("Could not clean up job directory.", ioe);
-								}
+								deleteJobFiles(jobID);
 							}
 						}
 					}
@@ -302,18 +289,36 @@ class HistoryServerArchiveFetcher {
 
 			cachedArchives.removeAll(jobsToRemove);
 			jobsToRemove.forEach(removedJobID -> {
-				try {
-					Files.deleteIfExists(new File(webOverviewDir, removedJobID + JSON_FILE_ENDING).toPath());
-					FileUtils.deleteDirectory(new File(webJobDir, removedJobID));
-				} catch (IOException e) {
-					LOG.error("Failure while removing job overview for job {}.", removedJobID, e);
-				}
+				deleteJobFiles(removedJobID);
 				deleteLog.add(new ArchiveEvent(removedJobID, ArchiveEventType.DELETED));
 			});
 
 			return deleteLog;
 		}
 
+		private void deleteJobFiles(String jobID) {
+			// Make sure we do not include this job in the overview
+			try {
+				Files.deleteIfExists(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
+			} catch (IOException ioe) {
+				LOG.warn("Could not delete file from overview directory.", ioe);
+			}
+
+			// Clean up job files we may have created
+			File jobDirectory = new File(webJobDir, jobID);
+			try {
+				FileUtils.deleteDirectory(jobDirectory);
+			} catch (IOException ioe) {
+				LOG.warn("Could not clean up job directory.", ioe);
+			}
+
+			try {
+				Files.deleteIfExists(new File(webJobDir, jobID + JSON_FILE_ENDING).toPath());
+			} catch (IOException ioe) {
+				LOG.warn("Could not delete file from job directory.", ioe);
+			}
+		}
+
 	}
 
 	private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 71e83078..4c30dd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -60,12 +60,19 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -174,7 +181,8 @@ public class HistoryServerTest extends TestLogger {
 		waitForArchivesCreation(numJobs);
 
 		CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs);
-		CountDownLatch numExpectedExpiredJobs = new CountDownLatch(numExpiredJobs);
+		CountDownLatch firstArchiveExpiredLatch = new CountDownLatch(numExpiredJobs);
+		CountDownLatch allArchivesExpiredLatch = new CountDownLatch(cleanupExpiredJobs ? numJobs : 0);
 
 		Configuration historyServerConfig = createTestConfiguration(cleanupExpiredJobs);
 
@@ -187,7 +195,8 @@ public class HistoryServerTest extends TestLogger {
 							numExpectedArchivedJobs.countDown();
 							break;
 						case DELETED:
-							numExpectedExpiredJobs.countDown();
+							firstArchiveExpiredLatch.countDown();
+							allArchivesExpiredLatch.countDown();
 							break;
 					}
 				});
@@ -209,9 +218,9 @@ public class HistoryServerTest extends TestLogger {
 			// delete one archive from jm
 			Files.deleteIfExists(jmDirectory.toPath().resolve(jobIdToDelete));
 
-			assertTrue(numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS));
+			assertTrue(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS));
 
-			// check that archive is present in hs
+			// check that archive is still/no longer present in hs
 			Collection<JobDetails> jobsAfterDeletion = getJobsOverview(baseUrl).getJobs();
 			Assert.assertEquals(numJobs - numExpiredJobs, jobsAfterDeletion.size());
 			Assert.assertEquals(1 - numExpiredJobs, jobsAfterDeletion.stream()
@@ -219,11 +228,40 @@ public class HistoryServerTest extends TestLogger {
 				.map(JobID::toString)
 				.filter(jobId -> jobId.equals(jobIdToDelete))
 				.count());
+
+			// delete remaining archives from jm and ensure files are cleaned up
+			List<String> remainingJobIds = jobsAfterDeletion.stream()
+				.map(JobDetails::getJobId)
+				.map(JobID::toString)
+				.collect(Collectors.toList());
+
+			for (String remainingJobId : remainingJobIds) {
+				Files.deleteIfExists(jmDirectory.toPath().resolve(remainingJobId));
+			}
+
+			assertTrue(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS));
+
+			assertJobFilesCleanedUp(cleanupExpiredJobs);
 		} finally {
 			hs.stop();
 		}
 	}
 
+	private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException {
+		try (Stream<Path> paths = Files.walk(hsDirectory.toPath())) {
+			final List<Path> jobFiles = paths
+				.filter(path -> !path.equals(hsDirectory.toPath()))
+				.map(path -> hsDirectory.toPath().relativize(path))
+				.filter(path -> !path.equals(Paths.get("config.json")))
+				.filter(path -> !path.equals(Paths.get("jobs")))
+				.filter(path -> !path.equals(Paths.get("jobs", "overview.json")))
+				.filter(path -> !path.equals(Paths.get("overviews")))
+				.collect(Collectors.toList());
+
+			assertThat(jobFiles, jobFilesShouldBeDeleted ? empty() : not(empty()));
+		}
+	}
+
 	private void waitForArchivesCreation(int numJobs) throws InterruptedException {
 		// the job is archived asynchronously after env.execute() returns
 		File[] archives = jmDirectory.listFiles();