You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2020/01/15 23:32:06 UTC

[twill] branch master updated: (TWILL-206) Cleanup cache directory of older sessions

This is an automated email from the ASF dual-hosted git repository.

chtyim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/twill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ed41d7  (TWILL-206) Cleanup cache directory of older sessions
5ed41d7 is described below

commit 5ed41d77c82efc0332890ddb2ef0b6acf3a01b6d
Author: Terence Yim <te...@google.com>
AuthorDate: Wed Jan 15 13:40:16 2020 -0800

    (TWILL-206) Cleanup cache directory of older sessions
    
    This closes #86 on Github.
    
    Signed-off-by: Terence Yim <te...@google.com>
---
 .../apache/twill/yarn/LocationCacheCleaner.java    | 47 +++++++------
 .../apache/twill/yarn/YarnTwillRunnerService.java  | 43 ++++++------
 .../twill/yarn/LocationCacheCleanerTest.java       | 78 ++++++++++++++++++++++
 .../org/apache/twill/yarn/LocationCacheTest.java   |  5 +-
 4 files changed, 127 insertions(+), 46 deletions(-)

diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
index fed76a5..4ba5352 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -18,7 +18,6 @@
 package org.apache.twill.yarn;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.conf.Configuration;
@@ -32,11 +31,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 /**
  * Responsible for cleanup of {@link LocationCache}.
@@ -66,7 +67,7 @@ final class LocationCacheCleaner extends AbstractIdleService {
   }
 
   @Override
-  protected void startUp() throws Exception {
+  protected void startUp() {
     scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("location-cache-cleanup"));
     scheduler.execute(new Runnable() {
       @Override
@@ -88,31 +89,27 @@ final class LocationCacheCleaner extends AbstractIdleService {
   }
 
   @Override
-  protected void shutDown() throws Exception {
+  protected void shutDown() {
     scheduler.shutdownNow();
   }
 
   @VisibleForTesting
   void forceCleanup(final long currentTime) {
-    Futures.getUnchecked(scheduler.submit(new Runnable() {
-      @Override
-      public void run() {
-        cleanup(currentTime);
-      }
-    }));
+    Futures.getUnchecked(scheduler.submit(() -> cleanup(currentTime)));
   }
 
   /**
    * Performs cleanup based on the given time.
    */
-  private void cleanup(long currentTime) {
+  @VisibleForTesting
+  void cleanup(long currentTime) {
     // First go through the pending cleanup list and remove those that can be removed
     Iterator<PendingCleanup> iterator = pendingCleanups.iterator();
     while (iterator.hasNext()) {
       PendingCleanup pendingCleanup = iterator.next();
 
       // If rejected by the predicate, it means it is being used, hence remove it from the pending cleanup list.
-      if (!cleanupPredicate.apply(pendingCleanup.getLocation())) {
+      if (!cleanupPredicate.test(pendingCleanup.getLocation())) {
         iterator.remove();
       } else {
         try {
@@ -133,15 +130,18 @@ final class LocationCacheCleaner extends AbstractIdleService {
     try {
       for (Location cacheDir : cacheBaseLocation.list()) {
         try {
-          for (Location location : cacheDir.list()) {
-            if (cleanupPredicate.apply(location)) {
-              long expireTime = currentTime;
-              if (cacheDir.getName().equals(sessionId)) {
-                expireTime += expiry;
-              } else {
-                // If the cache entry is from different YarnTwillRunnerService session, use the anti expiry time.
-                expireTime += antiqueExpiry;
-              }
+          boolean currentSession = cacheDir.getName().equals(sessionId);
+          List<Location> entries = cacheDir.list();
+          if (!currentSession && entries.isEmpty()) {
+            // Delete empty directory of old session
+            cacheDir.delete();
+            continue;
+          }
+
+          // If the cache entry is from different YarnTwillRunnerService session, use the anti expiry time.
+          long expireTime = computeExpiry(currentTime, currentSession ? expiry : antiqueExpiry);
+          for (Location location : entries) {
+            if (cleanupPredicate.test(location)) {
               // If the location is already pending for cleanup, this won't update the expire time as
               // the comparison of PendingCleanup is only by location.
               if (pendingCleanups.add(new PendingCleanup(location, expireTime))) {
@@ -158,6 +158,13 @@ final class LocationCacheCleaner extends AbstractIdleService {
     }
   }
 
+  private long computeExpiry(long currentTime, long increment) {
+    if (Long.MAX_VALUE - increment < currentTime) {
+      return Long.MAX_VALUE;
+    }
+    return currentTime + increment;
+  }
+
   /**
    * Class for holding information about cache location that is pending to be removed.
    * The equality and hash code is only based on the location.
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 20627e2..15902f2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -405,34 +405,31 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
 
   private LocationCacheCleaner startLocationCacheCleaner(final Location cacheBase, final String sessionId) {
     LocationCacheCleaner cleaner = new LocationCacheCleaner(
-      yarnConfig, cacheBase, sessionId, new Predicate<Location>() {
-        @Override
-        public boolean apply(Location location) {
-          // Collects all the locations that is being used by any live applications
-          Set<Location> activeLocations = new HashSet<>();
-          synchronized (YarnTwillRunnerService.this) {
-            for (YarnTwillController controller : controllers.values()) {
-              ApplicationMasterLiveNodeData amLiveNodeData = controller.getApplicationMasterLiveNodeData();
-              if (amLiveNodeData != null) {
-                for (LocalFile localFile : amLiveNodeData.getLocalFiles()) {
-                  activeLocations.add(locationFactory.create(localFile.getURI()));
-                }
+      yarnConfig, cacheBase, sessionId, location -> {
+        // Collects all the locations that is being used by any live applications
+        Set<Location> activeLocations = new HashSet<>();
+        synchronized (YarnTwillRunnerService.this) {
+          for (YarnTwillController controller : controllers.values()) {
+            ApplicationMasterLiveNodeData amLiveNodeData = controller.getApplicationMasterLiveNodeData();
+            if (amLiveNodeData != null) {
+              for (LocalFile localFile : amLiveNodeData.getLocalFiles()) {
+                activeLocations.add(locationFactory.create(localFile.getURI()));
               }
             }
           }
+        }
 
-          try {
-            // Always keep the launcher.jar and twill.jar from the current session as they should never change,
-            // hence never expires
-            activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.LAUNCHER_JAR));
-            activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.TWILL_JAR));
-          } catch (IOException e) {
-            // This should not happen
-            LOG.warn("Failed to construct cache location", e);
-          }
-
-          return !activeLocations.contains(location);
+        try {
+          // Always keep the launcher.jar and twill.jar from the current session as they should never change,
+          // hence never expires
+          activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.LAUNCHER_JAR));
+          activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.TWILL_JAR));
+        } catch (IOException e) {
+          // This should not happen
+          LOG.warn("Failed to construct cache location", e);
         }
+
+        return !activeLocations.contains(location);
       });
     cleaner.startAndWait();
     return cleaner;
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheCleanerTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheCleanerTest.java
new file mode 100644
index 0000000..a620f10
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheCleanerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.io.BasicLocationCache;
+import org.apache.twill.internal.io.LocationCache;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collections;
+
+/**
+ * Unit test for {@link LocationCacheCleaner}.
+ */
+public class LocationCacheCleanerTest {
+
+  @ClassRule
+  public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+  @Test
+  public void testCleanup() throws IOException {
+    LocationFactory lf = new LocalLocationFactory(TEMP_FOLDER.newFolder());
+    Location cacheBase = lf.create("cache");
+    LocationCache.Loader cacheLoader = new LocationCache.Loader() {
+      @Override
+      public void load(String name, Location targetLocation) throws IOException {
+        try (PrintStream printer = new PrintStream(targetLocation.getOutputStream(), true, "UTF-8")) {
+          printer.println(name);
+        }
+      }
+    };
+
+    // Create a cache with a session
+    LocationCache cache = new BasicLocationCache(cacheBase.append("old"));
+    cache.get("old", cacheLoader);
+
+    // Create a new cache with a different session
+    String sessionId = "new";
+    cache = new BasicLocationCache(cacheBase.append(sessionId));
+    cache.get("new", cacheLoader);
+
+    // Cleanup all files
+    LocationCacheCleaner cleaner = new LocationCacheCleaner(new Configuration(), cacheBase,
+                                                            sessionId, location -> true);
+    // The first cleanup will add files to pending deletion
+    cleaner.cleanup(Long.MAX_VALUE);
+    // Second call to cleanup will delete files and old sessions directories.
+    cleaner.cleanup(Long.MAX_VALUE);
+
+    // The cache directory should only have the new session directory
+    Assert.assertEquals(Collections.singletonList(cacheBase.append(sessionId)), cacheBase.list());
+    // The new session cache directory should be empty
+    Assert.assertTrue(cacheBase.append(sessionId).list().isEmpty());
+  }
+}
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
index 63ca28b..50e1a3a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -117,9 +117,8 @@ public class LocationCacheTest {
       .forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
                                    Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
 
-    // Now there shouldn't be any file under the current session cache directory
-    List<Location> locations = currentSessionCache.list();
-    Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
+    // The previous session cache directory should be deleted after a new session was running
+    Assert.assertFalse(currentSessionCache.exists());
   }
 
   /**