You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/07/08 19:11:32 UTC
[flink] branch release-1.10 updated: [FLINK-18097][history] Delete
all job-related files on expiration
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new d64a5a0 [FLINK-18097][history] Delete all job-related files on expiration
d64a5a0 is described below
commit d64a5a0c2566416158c67719dc08d7623e14fad8
Author: Milan Nikl <mi...@firma.seznam.cz>
AuthorDate: Tue Jun 9 17:09:32 2020 +0200
[FLINK-18097][history] Delete all job-related files on expiration
---
.../history/HistoryServerArchiveFetcher.java | 45 +++++++++++----------
.../webmonitor/history/HistoryServerTest.java | 46 ++++++++++++++++++++--
2 files changed, 67 insertions(+), 24 deletions(-)
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 5a41173..9b35c8e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -263,20 +263,7 @@ class HistoryServerArchiveFetcher {
LOG.info("Processing archive {} finished.", jobArchivePath);
} catch (IOException e) {
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
- // Make sure we do not include this job in the overview
- try {
- Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
- } catch (IOException ioe) {
- LOG.debug("Could not delete file from overview directory.", ioe);
- }
-
- // Clean up job files we may have created
- File jobDirectory = new File(webJobDir, jobID);
- try {
- FileUtils.deleteDirectory(jobDirectory);
- } catch (IOException ioe) {
- LOG.debug("Could not clean up job directory.", ioe);
- }
+ deleteJobFiles(jobID);
}
}
}
@@ -302,18 +289,36 @@ class HistoryServerArchiveFetcher {
cachedArchives.removeAll(jobsToRemove);
jobsToRemove.forEach(removedJobID -> {
- try {
- Files.deleteIfExists(new File(webOverviewDir, removedJobID + JSON_FILE_ENDING).toPath());
- FileUtils.deleteDirectory(new File(webJobDir, removedJobID));
- } catch (IOException e) {
- LOG.error("Failure while removing job overview for job {}.", removedJobID, e);
- }
+ deleteJobFiles(removedJobID);
deleteLog.add(new ArchiveEvent(removedJobID, ArchiveEventType.DELETED));
});
return deleteLog;
}
+ private void deleteJobFiles(String jobID) {
+ // Make sure we do not include this job in the overview
+ try {
+ Files.deleteIfExists(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from overview directory.", ioe);
+ }
+
+ // Clean up job files we may have created
+ File jobDirectory = new File(webJobDir, jobID);
+ try {
+ FileUtils.deleteDirectory(jobDirectory);
+ } catch (IOException ioe) {
+ LOG.warn("Could not clean up job directory.", ioe);
+ }
+
+ try {
+ Files.deleteIfExists(new File(webJobDir, jobID + JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from job directory.", ioe);
+ }
+ }
+
}
private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 71e83078..4c30dd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -60,12 +60,19 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertTrue;
/**
@@ -174,7 +181,8 @@ public class HistoryServerTest extends TestLogger {
waitForArchivesCreation(numJobs);
CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs);
- CountDownLatch numExpectedExpiredJobs = new CountDownLatch(numExpiredJobs);
+ CountDownLatch firstArchiveExpiredLatch = new CountDownLatch(numExpiredJobs);
+ CountDownLatch allArchivesExpiredLatch = new CountDownLatch(cleanupExpiredJobs ? numJobs : 0);
Configuration historyServerConfig = createTestConfiguration(cleanupExpiredJobs);
@@ -187,7 +195,8 @@ public class HistoryServerTest extends TestLogger {
numExpectedArchivedJobs.countDown();
break;
case DELETED:
- numExpectedExpiredJobs.countDown();
+ firstArchiveExpiredLatch.countDown();
+ allArchivesExpiredLatch.countDown();
break;
}
});
@@ -209,9 +218,9 @@ public class HistoryServerTest extends TestLogger {
// delete one archive from jm
Files.deleteIfExists(jmDirectory.toPath().resolve(jobIdToDelete));
- assertTrue(numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS));
+ assertTrue(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS));
- // check that archive is present in hs
+ // check that archive is still/no longer present in hs
Collection<JobDetails> jobsAfterDeletion = getJobsOverview(baseUrl).getJobs();
Assert.assertEquals(numJobs - numExpiredJobs, jobsAfterDeletion.size());
Assert.assertEquals(1 - numExpiredJobs, jobsAfterDeletion.stream()
@@ -219,11 +228,40 @@ public class HistoryServerTest extends TestLogger {
.map(JobID::toString)
.filter(jobId -> jobId.equals(jobIdToDelete))
.count());
+
+ // delete remaining archives from jm and ensure files are cleaned up
+ List<String> remainingJobIds = jobsAfterDeletion.stream()
+ .map(JobDetails::getJobId)
+ .map(JobID::toString)
+ .collect(Collectors.toList());
+
+ for (String remainingJobId : remainingJobIds) {
+ Files.deleteIfExists(jmDirectory.toPath().resolve(remainingJobId));
+ }
+
+ assertTrue(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS));
+
+ assertJobFilesCleanedUp(cleanupExpiredJobs);
} finally {
hs.stop();
}
}
+ private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException {
+ try (Stream<Path> paths = Files.walk(hsDirectory.toPath())) {
+ final List<Path> jobFiles = paths
+ .filter(path -> !path.equals(hsDirectory.toPath()))
+ .map(path -> hsDirectory.toPath().relativize(path))
+ .filter(path -> !path.equals(Paths.get("config.json")))
+ .filter(path -> !path.equals(Paths.get("jobs")))
+ .filter(path -> !path.equals(Paths.get("jobs", "overview.json")))
+ .filter(path -> !path.equals(Paths.get("overviews")))
+ .collect(Collectors.toList());
+
+ assertThat(jobFiles, jobFilesShouldBeDeleted ? empty() : not(empty()));
+ }
+ }
+
private void waitForArchivesCreation(int numJobs) throws InterruptedException {
// the job is archived asynchronously after env.execute() returns
File[] archives = jmDirectory.listFiles();