You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2016/10/28 15:59:39 UTC

hadoop git commit: YARN-5767. Fix the order that resources are cleaned up from the local Public/Private caches. Contributed by Chris Trezzo

Repository: hadoop
Updated Branches:
  refs/heads/trunk 7146359bf -> 1b79c417d


YARN-5767. Fix the order that resources are cleaned up from the local Public/Private caches. Contributed by Chris Trezzo


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b79c417
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b79c417
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b79c417

Branch: refs/heads/trunk
Commit: 1b79c417dca17bcd2e031864bc6ca07254c61b47
Parents: 7146359
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 28 15:58:04 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 28 15:58:04 2016 +0000

----------------------------------------------------------------------
 .../localizer/LocalCacheCleaner.java            | 182 ++++++++++++++
 .../localizer/LocalResourcesTrackerImpl.java    |   3 +-
 .../localizer/ResourceLocalizationService.java  |  37 +--
 .../localizer/ResourceRetentionSet.java         |  96 --------
 .../localizer/TestLocalCacheCleanup.java        | 235 +++++++++++++++++++
 .../localizer/TestResourceRetention.java        | 106 ---------
 6 files changed, 440 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java
new file mode 100644
index 0000000..7311919
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+/**
+ * A class responsible for cleaning the PUBLIC and PRIVATE local caches on a
+ * node manager.
+ */
+class LocalCacheCleaner {
+
+  private long currentSize;
+  private final long targetSize;
+  private final DeletionService delService;
+  private final SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap;
+
+  LocalCacheCleaner(DeletionService delService, long targetSize) {
+    this(delService, targetSize, new LRUComparator());
+  }
+
+  LocalCacheCleaner(DeletionService delService, long targetSize,
+      Comparator<? super LocalizedResource> cmp) {
+    this(delService, targetSize,
+        new TreeMap<LocalizedResource, LocalResourcesTracker>(cmp));
+  }
+
+  LocalCacheCleaner(DeletionService delService, long targetSize,
+      SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap) {
+    this.resourceMap = resourceMap;
+    this.delService = delService;
+    this.targetSize = targetSize;
+  }
+
+  /**
+   * Adds resources from the passed LocalResourceTracker that are candidates for
+   * deletion from the cache.
+   *
+   * @param newTracker add all resources being tracked by the passed
+   *          LocalResourcesTracker to the LocalCacheCleaner.
+   */
+  public void addResources(LocalResourcesTracker newTracker) {
+    for (LocalizedResource resource : newTracker) {
+      currentSize += resource.getSize();
+      if (resource.getRefCount() > 0) {
+        // Do not delete resources that are still in use
+        continue;
+      }
+      resourceMap.put(resource, newTracker);
+    }
+  }
+
+  /**
+   * Delete resources from the cache in the sorted order generated by the
+   * Comparator used to construct this class.
+   *
+   * @return stats about what was cleaned up during this call of cleanCache
+   */
+  public LocalCacheCleanerStats cleanCache() {
+    LocalCacheCleanerStats stats = new LocalCacheCleanerStats(currentSize);
+    for (Iterator<Map.Entry<LocalizedResource, LocalResourcesTracker>> i =
+        resourceMap.entrySet().iterator();
+        currentSize - stats.totalDelSize > targetSize && i.hasNext();) {
+      Map.Entry<LocalizedResource, LocalResourcesTracker> rsrc = i.next();
+      LocalizedResource resource = rsrc.getKey();
+      LocalResourcesTracker tracker = rsrc.getValue();
+      if (tracker.remove(resource, delService)) {
+        stats.incDelSize(tracker.getUser(), resource.getSize());
+      }
+    }
+    this.resourceMap.clear();
+    return stats;
+  }
+
+  static class LocalCacheCleanerStats {
+    private final Map<String, Long> userDelSizes = new TreeMap<String, Long>();
+    private final long cacheSizeBeforeClean;
+    private long totalDelSize;
+    private long publicDelSize;
+    private long privateDelSize;
+
+    LocalCacheCleanerStats(long cacheSizeBeforeClean) {
+      this.cacheSizeBeforeClean = cacheSizeBeforeClean;
+    }
+
+    void incDelSize(String user, long delSize) {
+      totalDelSize += delSize;
+      if (user == null) {
+        publicDelSize += delSize;
+      } else {
+        privateDelSize += delSize;
+        Long userDel = userDelSizes.get(user);
+        if (userDel != null) {
+          userDel += delSize;
+          userDelSizes.put(user, userDel);
+        } else {
+          userDelSizes.put(user, delSize);
+        }
+      }
+    }
+
+    Map<String, Long> getUserDelSizes() {
+      return Collections.unmodifiableMap(userDelSizes);
+    }
+
+    long getCacheSizeBeforeClean() {
+      return cacheSizeBeforeClean;
+    }
+
+    long getTotalDelSize() {
+      return totalDelSize;
+    }
+
+    long getPublicDelSize() {
+      return publicDelSize;
+    }
+
+    long getPrivateDelSize() {
+      return privateDelSize;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Cache Size Before Clean: ").append(cacheSizeBeforeClean)
+          .append(", ");
+      sb.append("Total Deleted: ").append(totalDelSize).append(", ");
+      sb.append("Public Deleted: ").append(publicDelSize).append(", ");
+      sb.append("Private Deleted: ").append(privateDelSize);
+      return sb.toString();
+    }
+
+    public String toStringDetailed() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(this.toString());
+      sb.append(", Private Deleted Detail: {");
+      for (Map.Entry<String, Long> e : userDelSizes.entrySet()) {
+        sb.append(" ").append(e.getKey()).append(":").append(e.getValue());
+      }
+      sb.append(" }");
+      return sb.toString();
+    }
+  }
+
+  private static class LRUComparator implements Comparator<LocalizedResource>,
+      Serializable {
+
+    private static final long serialVersionUID = 7034380228434701685L;
+
+    public int compare(LocalizedResource r1, LocalizedResource r2) {
+      long ret = r1.getTimestamp() - r2.getTimestamp();
+      if (0 == ret) {
+        return System.identityHashCode(r1) - System.identityHashCode(r2);
+      }
+      return ret > 0 ? 1 : -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 38fffe6..940c599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -65,7 +65,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
   private final String user;
   private final ApplicationId appId;
   private final Dispatcher dispatcher;
-  private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
+  @VisibleForTesting
+  final ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc;
   private Configuration conf;
   private LocalDirsHandlerService dirsHandler;
   /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 4bd004b..4cd1acc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
@@ -152,7 +153,8 @@ public class ResourceLocalizationService extends CompositeService
 
   private Server server;
   private InetSocketAddress localizationServerAddress;
-  private long cacheTargetSize;
+  @VisibleForTesting
+  long cacheTargetSize;
   private long cacheCleanupPeriod;
 
   private final ContainerExecutor exec;
@@ -164,7 +166,8 @@ public class ResourceLocalizationService extends CompositeService
   private LocalizerTokenSecretManager secretManager;
   private NMStateStoreService stateStore;
 
-  private LocalResourcesTracker publicRsrc;
+  @VisibleForTesting
+  LocalResourcesTracker publicRsrc;
 
   private LocalDirsHandlerService dirsHandler;
   private DirsChangeListener localDirsChangeListener;
@@ -176,7 +179,8 @@ public class ResourceLocalizationService extends CompositeService
    * Map of LocalResourceTrackers keyed by username, for private
    * resources.
    */
-  private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
+  @VisibleForTesting
+  final ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
     new ConcurrentHashMap<String,LocalResourcesTracker>();
 
   /**
@@ -427,7 +431,7 @@ public class ResourceLocalizationService extends CompositeService
       handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
       break;
     case CACHE_CLEANUP:
-      handleCacheCleanup(event);
+      handleCacheCleanup();
       break;
     case CLEANUP_CONTAINER_RESOURCES:
       handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
@@ -512,20 +516,21 @@ public class ResourceLocalizationService extends CompositeService
     localizerTracker.endContainerLocalization(locId);
   }
 
-  private void handleCacheCleanup(LocalizationEvent event) {
-    ResourceRetentionSet retain =
-      new ResourceRetentionSet(delService, cacheTargetSize);
-    retain.addResources(publicRsrc);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Resource cleanup (public) " + retain);
-    }
+  @VisibleForTesting
+  LocalCacheCleanerStats handleCacheCleanup() {
+    LocalCacheCleaner cleaner =
+        new LocalCacheCleaner(delService, cacheTargetSize);
+    cleaner.addResources(publicRsrc);
     for (LocalResourcesTracker t : privateRsrc.values()) {
-      retain.addResources(t);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
-      }
+      cleaner.addResources(t);
+    }
+    LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(stats.toStringDetailed());
+    } else if (LOG.isInfoEnabled()) {
+      LOG.info(stats.toString());
     }
-    //TODO Check if appRsrcs should also be added to the retention set.
+    return stats;
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
deleted file mode 100644
index 447a792..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
-public class ResourceRetentionSet {
-
-  private long delSize;
-  private long currentSize;
-  private final long targetSize;
-  private final DeletionService delService;
-  private final SortedMap<LocalizedResource,LocalResourcesTracker> retain;
-
-  ResourceRetentionSet(DeletionService delService, long targetSize) {
-    this(delService, targetSize, new LRUComparator());
-  }
-
-  ResourceRetentionSet(DeletionService delService, long targetSize,
-      Comparator<? super LocalizedResource> cmp) {
-    this(delService, targetSize,
-        new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp));
-  }
-
-  ResourceRetentionSet(DeletionService delService, long targetSize,
-      SortedMap<LocalizedResource,LocalResourcesTracker> retain) {
-    this.retain = retain;
-    this.delService = delService;
-    this.targetSize = targetSize;
-  }
-
-  public void addResources(LocalResourcesTracker newTracker) {
-    for (LocalizedResource resource : newTracker) {
-      currentSize += resource.getSize();
-      if (resource.getRefCount() > 0) {
-        // always retain resources in use
-        continue;
-      }
-      retain.put(resource, newTracker);
-    }
-    for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
-           retain.entrySet().iterator();
-         currentSize - delSize > targetSize && i.hasNext();) {
-      Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
-      LocalizedResource resource = rsrc.getKey();
-      LocalResourcesTracker tracker = rsrc.getValue();
-      if (tracker.remove(resource, delService)) {
-        delSize += resource.getSize();
-        i.remove();
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("Cache: ").append(currentSize).append(", ");
-    sb.append("Deleted: ").append(delSize);
-    return sb.toString();
-  }
-
-  static class LRUComparator implements Comparator<LocalizedResource> {
-    public int compare(LocalizedResource r1, LocalizedResource r2) {
-      long ret = r1.getTimestamp() - r2.getTimestamp();
-      if (0 == ret) {
-        return System.identityHashCode(r1) - System.identityHashCode(r2);
-      }
-      return ret > 0 ? 1 : -1;
-    }
-    public boolean equals(Object other) {
-      return this == other;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
new file mode 100644
index 0000000..d6db67a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
@@ -0,0 +1,235 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
+import org.junit.Test;
+
+/**
+ * This class tests the clean up of local caches the node manager uses for the
+ * purpose of resource localization.
+ */
+public class TestLocalCacheCleanup {
+
+  @Test
+  public void testBasicCleanup() {
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    addResource(publicRsrc, "/pub-resource1.txt", 5, 20, 0);
+    addResource(publicRsrc, "/pub-resource2.txt", 3, 20, 0);
+    addResource(publicRsrc, "/pub-resource3.txt", 15, 20, 0);
+
+    ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
+        new ConcurrentHashMap<String, LocalResourcesTracker>();
+
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    addResource(user1rsrcs, "/private-u1-resource4.txt", 1, 20, 0);
+    LocalResourcesTracker user1Tracker =
+        new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
+    privateRsrc.put("user1", user1Tracker);
+
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    addResource(user2rsrcs, "/private-u2-resource5.txt", 2, 20, 0);
+    LocalResourcesTracker user2Tracker =
+        new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
+    privateRsrc.put("user2", user2Tracker);
+
+    ResourceLocalizationService rls =
+        createLocService(publicRsrc, privateRsrc, 0);
+    LocalCacheCleanerStats stats = rls.handleCacheCleanup();
+    assertEquals(0, ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc)
+        .getLocalRsrc().size());
+    assertEquals(0,
+        ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
+            .getLocalRsrc().size());
+    assertEquals(0,
+        ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
+            .getLocalRsrc().size());
+    assertEquals(100, stats.getTotalDelSize());
+    assertEquals(60, stats.getPublicDelSize());
+    assertEquals(40, stats.getPrivateDelSize());
+  }
+
+  @Test
+  public void testPositiveRefCount() {
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    // Oldest resource with a positive ref count the other with a ref count
+    // equal to 0.
+    LocalResourceRequest survivor =
+        addResource(publicRsrc, "/pub-resource1.txt", 1, 20, 1);
+    addResource(publicRsrc, "/pub-resource2.txt", 5, 20, 0);
+
+    ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
+        new ConcurrentHashMap<String, LocalResourcesTracker>();
+
+    ResourceLocalizationService rls =
+        createLocService(publicRsrc, privateRsrc, 0);
+    LocalCacheCleanerStats stats = rls.handleCacheCleanup();
+    StubbedLocalResourcesTrackerImpl resources =
+        (StubbedLocalResourcesTrackerImpl) rls.publicRsrc;
+    assertEquals(1, resources.getLocalRsrc().size());
+    assertTrue(resources.getLocalRsrc().containsKey(survivor));
+    assertEquals(20, stats.getTotalDelSize());
+    assertEquals(20, stats.getPublicDelSize());
+    assertEquals(0, stats.getPrivateDelSize());
+  }
+
+  @Test
+  public void testLRUAcrossTrackers() {
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    LocalResourceRequest pubSurviver1 =
+        addResource(publicRsrc, "/pub-resource1.txt", 8, 20, 0);
+    LocalResourceRequest pubSurviver2 =
+        addResource(publicRsrc, "/pub-resource2.txt", 7, 20, 0);
+    addResource(publicRsrc, "/pub-resource3.txt", 1, 20, 0);
+
+    ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
+        new ConcurrentHashMap<String, LocalResourcesTracker>();
+
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    LocalResourceRequest usr1Surviver1 =
+        addResource(user1rsrcs, "/private-u1-resource1.txt", 6, 20, 0);
+    addResource(user1rsrcs, "/private-u1-resource2.txt", 2, 20, 0);
+    LocalResourcesTracker user1Tracker =
+        new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
+    privateRsrc.put("user1", user1Tracker);
+
+    ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+    LocalResourceRequest usr2Surviver1 =
+        addResource(user2rsrcs, "/private-u2-resource1.txt", 5, 20, 0);
+    addResource(user2rsrcs, "/private-u2-resource2.txt", 3, 20, 0);
+    addResource(user2rsrcs, "/private-u2-resource3.txt", 4, 20, 0);
+    LocalResourcesTracker user2Tracker =
+        new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
+    privateRsrc.put("user2", user2Tracker);
+
+    ResourceLocalizationService rls =
+        createLocService(publicRsrc, privateRsrc, 80);
+    LocalCacheCleanerStats stats = rls.handleCacheCleanup();
+
+    Map<LocalResourceRequest, LocalizedResource> pubLocalRsrc =
+        ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc).getLocalRsrc();
+    assertEquals(2, pubLocalRsrc.size());
+    assertTrue(pubLocalRsrc.containsKey(pubSurviver1));
+    assertTrue(pubLocalRsrc.containsKey(pubSurviver2));
+
+    Map<LocalResourceRequest, LocalizedResource> usr1LocalRsrc =
+        ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
+            .getLocalRsrc();
+    assertEquals(1, usr1LocalRsrc.size());
+    assertTrue(usr1LocalRsrc.containsKey(usr1Surviver1));
+
+    Map<LocalResourceRequest, LocalizedResource> usr2LocalRsrc =
+        ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
+            .getLocalRsrc();
+    assertEquals(1, usr2LocalRsrc.size());
+    assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
+
+    assertEquals(80, stats.getTotalDelSize());
+    assertEquals(20, stats.getPublicDelSize());
+    assertEquals(60, stats.getPrivateDelSize());
+  }
+
+  private ResourceLocalizationService createLocService(
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrcs,
+      ConcurrentMap<String, LocalResourcesTracker> privateRsrcs,
+      long targetCacheSize) {
+    Context mockedContext = mock(Context.class);
+    when(mockedContext.getNMStateStore()).thenReturn(null);
+    ResourceLocalizationService rls =
+        new ResourceLocalizationService(null, null, null, null, mockedContext);
+    // We set the following members directly so we don't have to deal with
+    // mocking out the service init method.
+    rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);
+    rls.cacheTargetSize = targetCacheSize;
+    rls.privateRsrc.putAll(privateRsrcs);
+    return rls;
+  }
+
+  private LocalResourceRequest addResource(
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> resources,
+      String path, long timestamp, long size, int refCount) {
+    LocalResourceRequest request = createLocalResourceRequest(path, timestamp);
+    LocalizedResource resource =
+        createLocalizedResource(size, refCount, timestamp, request);
+    resources.put(request, resource);
+    return request;
+  }
+
+  private LocalResourceRequest createLocalResourceRequest(String path,
+      long timestamp) {
+    return new LocalResourceRequest(new Path(path), timestamp,
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
+  }
+
+  private LocalizedResource createLocalizedResource(long size, int refCount,
+      long timestamp, LocalResourceRequest req) {
+    LocalizedResource lr = mock(LocalizedResource.class);
+    when(lr.getSize()).thenReturn(size);
+    when(lr.getRefCount()).thenReturn(refCount);
+    when(lr.getTimestamp()).thenReturn(timestamp);
+    when(lr.getState()).thenReturn(ResourceState.LOCALIZED);
+    when(lr.getRequest()).thenReturn(req);
+    return lr;
+  }
+
+  class StubbedLocalResourcesTrackerImpl extends LocalResourcesTrackerImpl {
+    StubbedLocalResourcesTrackerImpl(String user,
+        ConcurrentMap<LocalResourceRequest, LocalizedResource> rsrcs) {
+      super(user, null, null, rsrcs, false, new Configuration(), null, null);
+    }
+
+    @Override
+    public boolean remove(LocalizedResource rem, DeletionService delService) {
+      LocalizedResource r = localrsrc.remove(rem.getRequest());
+      if (r != null) {
+        LOG.info("Removed " + rem.getRequest().getPath()
+            + " from localized cache");
+        return true;
+      }
+      return false;
+    }
+
+    Map<LocalResourceRequest, LocalizedResource> getLocalRsrc() {
+      return Collections.unmodifiableMap(localrsrc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
deleted file mode 100644
index 81e69e2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.mockito.ArgumentCaptor;
-
-import static org.mockito.Mockito.*;
-
-public class TestResourceRetention {
-
-  @Test
-  public void testRsrcUnused() {
-    DeletionService delService = mock(DeletionService.class);
-    long TARGET_MB = 10 << 20;
-    ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB);
-    // 3MB files @{10, 15}
-    LocalResourcesTracker pubTracker =
-      createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5);
-    // 1MB files @{3, 6, 9, 12}
-    LocalResourcesTracker trackerA =
-      createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3);
-    // 4MB file @{1}
-    LocalResourcesTracker trackerB =
-      createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5);
-    // 2MB files @{7, 9, 11}
-    LocalResourcesTracker trackerC =
-      createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2);
-    // Total cache: 20MB; verify removed at least 10MB
-    rss.addResources(pubTracker);
-    rss.addResources(trackerA);
-    rss.addResources(trackerB);
-    rss.addResources(trackerC);
-    long deleted = 0L;
-    ArgumentCaptor<LocalizedResource> captor =
-      ArgumentCaptor.forClass(LocalizedResource.class);
-    verify(pubTracker, atMost(2))
-      .remove(captor.capture(), isA(DeletionService.class));
-    verify(trackerA, atMost(4))
-      .remove(captor.capture(), isA(DeletionService.class));
-    verify(trackerB, atMost(1))
-      .remove(captor.capture(), isA(DeletionService.class));
-    verify(trackerC, atMost(3))
-      .remove(captor.capture(), isA(DeletionService.class));
-    for (LocalizedResource rem : captor.getAllValues()) {
-      deleted += rem.getSize();
-    }
-    assertTrue(deleted >= 10 * 1024 * 1024);
-    assertTrue(deleted < 15 * 1024 * 1024);
-  }
-
-  LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
-      long nRsrcs, long timestamp, long tsstep) {
-    Configuration conf = new Configuration();
-    ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
-      new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
-    LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
-      null, trackerResources, false, conf, new NMNullStateStoreService(),null));
-    for (int i = 0; i < nRsrcs; ++i) {
-      final LocalResourceRequest req = new LocalResourceRequest(
-          new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
-          LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
-      final long ts = timestamp + i * tsstep;
-      final Path p = new Path("file:///local/" + user + "/rsrc" + i);
-      LocalizedResource rsrc = new LocalizedResource(req, null) {
-        @Override public int getRefCount() { return 0; }
-        @Override public long getSize() { return rsrcSize; }
-        @Override public Path getLocalPath() { return p; }
-        @Override public long getTimestamp() { return ts; }
-        @Override
-        public ResourceState getState() { return ResourceState.LOCALIZED; }
-      };
-      trackerResources.put(req, rsrc);
-    }
-    return ret;
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org