You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2020/01/15 23:32:06 UTC
[twill] branch master updated: (TWILL-206) Cleanup cache directory
of older sessions
This is an automated email from the ASF dual-hosted git repository.
chtyim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/twill.git
The following commit(s) were added to refs/heads/master by this push:
new 5ed41d7 (TWILL-206) Cleanup cache directory of older sessions
5ed41d7 is described below
commit 5ed41d77c82efc0332890ddb2ef0b6acf3a01b6d
Author: Terence Yim <te...@google.com>
AuthorDate: Wed Jan 15 13:40:16 2020 -0800
(TWILL-206) Cleanup cache directory of older sessions
This closes #86 on Github.
Signed-off-by: Terence Yim <te...@google.com>
---
.../apache/twill/yarn/LocationCacheCleaner.java | 47 +++++++------
.../apache/twill/yarn/YarnTwillRunnerService.java | 43 ++++++------
.../twill/yarn/LocationCacheCleanerTest.java | 78 ++++++++++++++++++++++
.../org/apache/twill/yarn/LocationCacheTest.java | 5 +-
4 files changed, 127 insertions(+), 46 deletions(-)
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
index fed76a5..4ba5352 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -18,7 +18,6 @@
package org.apache.twill.yarn;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
@@ -32,11 +31,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
/**
* Responsible for cleanup of {@link LocationCache}.
@@ -66,7 +67,7 @@ final class LocationCacheCleaner extends AbstractIdleService {
}
@Override
- protected void startUp() throws Exception {
+ protected void startUp() {
scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("location-cache-cleanup"));
scheduler.execute(new Runnable() {
@Override
@@ -88,31 +89,27 @@ final class LocationCacheCleaner extends AbstractIdleService {
}
@Override
- protected void shutDown() throws Exception {
+ protected void shutDown() {
scheduler.shutdownNow();
}
@VisibleForTesting
void forceCleanup(final long currentTime) {
- Futures.getUnchecked(scheduler.submit(new Runnable() {
- @Override
- public void run() {
- cleanup(currentTime);
- }
- }));
+ Futures.getUnchecked(scheduler.submit(() -> cleanup(currentTime)));
}
/**
* Performs cleanup based on the given time.
*/
- private void cleanup(long currentTime) {
+ @VisibleForTesting
+ void cleanup(long currentTime) {
// First go through the pending cleanup list and remove those that can be removed
Iterator<PendingCleanup> iterator = pendingCleanups.iterator();
while (iterator.hasNext()) {
PendingCleanup pendingCleanup = iterator.next();
// If rejected by the predicate, it means it is being used, hence remove it from the pending cleanup list.
- if (!cleanupPredicate.apply(pendingCleanup.getLocation())) {
+ if (!cleanupPredicate.test(pendingCleanup.getLocation())) {
iterator.remove();
} else {
try {
@@ -133,15 +130,18 @@ final class LocationCacheCleaner extends AbstractIdleService {
try {
for (Location cacheDir : cacheBaseLocation.list()) {
try {
- for (Location location : cacheDir.list()) {
- if (cleanupPredicate.apply(location)) {
- long expireTime = currentTime;
- if (cacheDir.getName().equals(sessionId)) {
- expireTime += expiry;
- } else {
- // If the cache entry is from different YarnTwillRunnerService session, use the anti expiry time.
- expireTime += antiqueExpiry;
- }
+ boolean currentSession = cacheDir.getName().equals(sessionId);
+ List<Location> entries = cacheDir.list();
+ if (!currentSession && entries.isEmpty()) {
+ // Delete empty directory of old session
+ cacheDir.delete();
+ continue;
+ }
+
+ // If the cache entry is from different YarnTwillRunnerService session, use the anti expiry time.
+ long expireTime = computeExpiry(currentTime, currentSession ? expiry : antiqueExpiry);
+ for (Location location : entries) {
+ if (cleanupPredicate.test(location)) {
// If the location is already pending for cleanup, this won't update the expire time as
// the comparison of PendingCleanup is only by location.
if (pendingCleanups.add(new PendingCleanup(location, expireTime))) {
@@ -158,6 +158,13 @@ final class LocationCacheCleaner extends AbstractIdleService {
}
}
+ private long computeExpiry(long currentTime, long increment) {
+ if (Long.MAX_VALUE - increment < currentTime) {
+ return Long.MAX_VALUE;
+ }
+ return currentTime + increment;
+ }
+
/**
* Class for holding information about cache location that is pending to be removed.
* The equality and hash code is only based on the location.
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 20627e2..15902f2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -405,34 +405,31 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
private LocationCacheCleaner startLocationCacheCleaner(final Location cacheBase, final String sessionId) {
LocationCacheCleaner cleaner = new LocationCacheCleaner(
- yarnConfig, cacheBase, sessionId, new Predicate<Location>() {
- @Override
- public boolean apply(Location location) {
- // Collects all the locations that is being used by any live applications
- Set<Location> activeLocations = new HashSet<>();
- synchronized (YarnTwillRunnerService.this) {
- for (YarnTwillController controller : controllers.values()) {
- ApplicationMasterLiveNodeData amLiveNodeData = controller.getApplicationMasterLiveNodeData();
- if (amLiveNodeData != null) {
- for (LocalFile localFile : amLiveNodeData.getLocalFiles()) {
- activeLocations.add(locationFactory.create(localFile.getURI()));
- }
+ yarnConfig, cacheBase, sessionId, location -> {
+ // Collects all the locations that is being used by any live applications
+ Set<Location> activeLocations = new HashSet<>();
+ synchronized (YarnTwillRunnerService.this) {
+ for (YarnTwillController controller : controllers.values()) {
+ ApplicationMasterLiveNodeData amLiveNodeData = controller.getApplicationMasterLiveNodeData();
+ if (amLiveNodeData != null) {
+ for (LocalFile localFile : amLiveNodeData.getLocalFiles()) {
+ activeLocations.add(locationFactory.create(localFile.getURI()));
}
}
}
+ }
- try {
- // Always keep the launcher.jar and twill.jar from the current session as they should never change,
- // hence never expires
- activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.LAUNCHER_JAR));
- activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.TWILL_JAR));
- } catch (IOException e) {
- // This should not happen
- LOG.warn("Failed to construct cache location", e);
- }
-
- return !activeLocations.contains(location);
+ try {
+ // Always keep the launcher.jar and twill.jar from the current session as they should never change,
+ // hence never expires
+ activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.LAUNCHER_JAR));
+ activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.TWILL_JAR));
+ } catch (IOException e) {
+ // This should not happen
+ LOG.warn("Failed to construct cache location", e);
}
+
+ return !activeLocations.contains(location);
});
cleaner.startAndWait();
return cleaner;
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheCleanerTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheCleanerTest.java
new file mode 100644
index 0000000..a620f10
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheCleanerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.io.BasicLocationCache;
+import org.apache.twill.internal.io.LocationCache;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collections;
+
+/**
+ * Unit test for {@link LocationCacheCleaner}.
+ */
+public class LocationCacheCleanerTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testCleanup() throws IOException {
+ LocationFactory lf = new LocalLocationFactory(TEMP_FOLDER.newFolder());
+ Location cacheBase = lf.create("cache");
+ LocationCache.Loader cacheLoader = new LocationCache.Loader() {
+ @Override
+ public void load(String name, Location targetLocation) throws IOException {
+ try (PrintStream printer = new PrintStream(targetLocation.getOutputStream(), true, "UTF-8")) {
+ printer.println(name);
+ }
+ }
+ };
+
+ // Create a cache with a session
+ LocationCache cache = new BasicLocationCache(cacheBase.append("old"));
+ cache.get("old", cacheLoader);
+
+ // Create a new cache with a different session
+ String sessionId = "new";
+ cache = new BasicLocationCache(cacheBase.append(sessionId));
+ cache.get("new", cacheLoader);
+
+ // Cleanup all files
+ LocationCacheCleaner cleaner = new LocationCacheCleaner(new Configuration(), cacheBase,
+ sessionId, location -> true);
+ // The first cleanup will add files to pending deletion
+ cleaner.cleanup(Long.MAX_VALUE);
+ // Second call to cleanup will delete files and old sessions directories.
+ cleaner.cleanup(Long.MAX_VALUE);
+
+ // The cache directory should only have the new session directory
+ Assert.assertEquals(Collections.singletonList(cacheBase.append(sessionId)), cacheBase.list());
+ // The new session cache directory should be empty
+ Assert.assertTrue(cacheBase.append(sessionId).list().isEmpty());
+ }
+}
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
index 63ca28b..50e1a3a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -117,9 +117,8 @@ public class LocationCacheTest {
.forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
- // Now there shouldn't be any file under the current session cache directory
- List<Location> locations = currentSessionCache.list();
- Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
+ // The previous session cache directory should be deleted after a new session was running
+ Assert.assertFalse(currentSessionCache.exists());
}
/**