You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2016/10/28 15:59:39 UTC
hadoop git commit: YARN-5767. Fix the order that resources are
cleaned up from the local Public/Private caches. Contributed by Chris Trezzo
Repository: hadoop
Updated Branches:
refs/heads/trunk 7146359bf -> 1b79c417d
YARN-5767. Fix the order that resources are cleaned up from the local Public/Private caches. Contributed by Chris Trezzo
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b79c417
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b79c417
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b79c417
Branch: refs/heads/trunk
Commit: 1b79c417dca17bcd2e031864bc6ca07254c61b47
Parents: 7146359
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 28 15:58:04 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 28 15:58:04 2016 +0000
----------------------------------------------------------------------
.../localizer/LocalCacheCleaner.java | 182 ++++++++++++++
.../localizer/LocalResourcesTrackerImpl.java | 3 +-
.../localizer/ResourceLocalizationService.java | 37 +--
.../localizer/ResourceRetentionSet.java | 96 --------
.../localizer/TestLocalCacheCleanup.java | 235 +++++++++++++++++++
.../localizer/TestResourceRetention.java | 106 ---------
6 files changed, 440 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java
new file mode 100644
index 0000000..7311919
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheCleaner.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+/**
+ * A class responsible for cleaning the PUBLIC and PRIVATE local caches on a
+ * node manager.
+ */
+class LocalCacheCleaner {
+
+ private long currentSize;
+ private final long targetSize;
+ private final DeletionService delService;
+ private final SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap;
+
+ LocalCacheCleaner(DeletionService delService, long targetSize) {
+ this(delService, targetSize, new LRUComparator());
+ }
+
+ LocalCacheCleaner(DeletionService delService, long targetSize,
+ Comparator<? super LocalizedResource> cmp) {
+ this(delService, targetSize,
+ new TreeMap<LocalizedResource, LocalResourcesTracker>(cmp));
+ }
+
+ LocalCacheCleaner(DeletionService delService, long targetSize,
+ SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap) {
+ this.resourceMap = resourceMap;
+ this.delService = delService;
+ this.targetSize = targetSize;
+ }
+
+ /**
+ * Adds resources from the passed LocalResourceTracker that are candidates for
+ * deletion from the cache.
+ *
+ * @param newTracker add all resources being tracked by the passed
+ * LocalResourcesTracker to the LocalCacheCleaner.
+ */
+ public void addResources(LocalResourcesTracker newTracker) {
+ for (LocalizedResource resource : newTracker) {
+ currentSize += resource.getSize();
+ if (resource.getRefCount() > 0) {
+ // Do not delete resources that are still in use
+ continue;
+ }
+ resourceMap.put(resource, newTracker);
+ }
+ }
+
+ /**
+ * Delete resources from the cache in the sorted order generated by the
+ * Comparator used to construct this class.
+ *
+ * @return stats about what was cleaned up during this call of cleanCache
+ */
+ public LocalCacheCleanerStats cleanCache() {
+ LocalCacheCleanerStats stats = new LocalCacheCleanerStats(currentSize);
+ for (Iterator<Map.Entry<LocalizedResource, LocalResourcesTracker>> i =
+ resourceMap.entrySet().iterator();
+ currentSize - stats.totalDelSize > targetSize && i.hasNext();) {
+ Map.Entry<LocalizedResource, LocalResourcesTracker> rsrc = i.next();
+ LocalizedResource resource = rsrc.getKey();
+ LocalResourcesTracker tracker = rsrc.getValue();
+ if (tracker.remove(resource, delService)) {
+ stats.incDelSize(tracker.getUser(), resource.getSize());
+ }
+ }
+ this.resourceMap.clear();
+ return stats;
+ }
+
+ static class LocalCacheCleanerStats {
+ private final Map<String, Long> userDelSizes = new TreeMap<String, Long>();
+ private final long cacheSizeBeforeClean;
+ private long totalDelSize;
+ private long publicDelSize;
+ private long privateDelSize;
+
+ LocalCacheCleanerStats(long cacheSizeBeforeClean) {
+ this.cacheSizeBeforeClean = cacheSizeBeforeClean;
+ }
+
+ void incDelSize(String user, long delSize) {
+ totalDelSize += delSize;
+ if (user == null) {
+ publicDelSize += delSize;
+ } else {
+ privateDelSize += delSize;
+ Long userDel = userDelSizes.get(user);
+ if (userDel != null) {
+ userDel += delSize;
+ userDelSizes.put(user, userDel);
+ } else {
+ userDelSizes.put(user, delSize);
+ }
+ }
+ }
+
+ Map<String, Long> getUserDelSizes() {
+ return Collections.unmodifiableMap(userDelSizes);
+ }
+
+ long getCacheSizeBeforeClean() {
+ return cacheSizeBeforeClean;
+ }
+
+ long getTotalDelSize() {
+ return totalDelSize;
+ }
+
+ long getPublicDelSize() {
+ return publicDelSize;
+ }
+
+ long getPrivateDelSize() {
+ return privateDelSize;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Cache Size Before Clean: ").append(cacheSizeBeforeClean)
+ .append(", ");
+ sb.append("Total Deleted: ").append(totalDelSize).append(", ");
+ sb.append("Public Deleted: ").append(publicDelSize).append(", ");
+ sb.append("Private Deleted: ").append(privateDelSize);
+ return sb.toString();
+ }
+
+ public String toStringDetailed() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.toString());
+ sb.append(", Private Deleted Detail: {");
+ for (Map.Entry<String, Long> e : userDelSizes.entrySet()) {
+ sb.append(" ").append(e.getKey()).append(":").append(e.getValue());
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
+ }
+
+ private static class LRUComparator implements Comparator<LocalizedResource>,
+ Serializable {
+
+ private static final long serialVersionUID = 7034380228434701685L;
+
+ public int compare(LocalizedResource r1, LocalizedResource r2) {
+ long ret = r1.getTimestamp() - r2.getTimestamp();
+ if (0 == ret) {
+ return System.identityHashCode(r1) - System.identityHashCode(r2);
+ }
+ return ret > 0 ? 1 : -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 38fffe6..940c599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -65,7 +65,8 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
private final String user;
private final ApplicationId appId;
private final Dispatcher dispatcher;
- private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
+ @VisibleForTesting
+ final ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc;
private Configuration conf;
private LocalDirsHandlerService dirsHandler;
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 4bd004b..4cd1acc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
@@ -152,7 +153,8 @@ public class ResourceLocalizationService extends CompositeService
private Server server;
private InetSocketAddress localizationServerAddress;
- private long cacheTargetSize;
+ @VisibleForTesting
+ long cacheTargetSize;
private long cacheCleanupPeriod;
private final ContainerExecutor exec;
@@ -164,7 +166,8 @@ public class ResourceLocalizationService extends CompositeService
private LocalizerTokenSecretManager secretManager;
private NMStateStoreService stateStore;
- private LocalResourcesTracker publicRsrc;
+ @VisibleForTesting
+ LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
private DirsChangeListener localDirsChangeListener;
@@ -176,7 +179,8 @@ public class ResourceLocalizationService extends CompositeService
* Map of LocalResourceTrackers keyed by username, for private
* resources.
*/
- private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
+ @VisibleForTesting
+ final ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>();
/**
@@ -427,7 +431,7 @@ public class ResourceLocalizationService extends CompositeService
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
break;
case CACHE_CLEANUP:
- handleCacheCleanup(event);
+ handleCacheCleanup();
break;
case CLEANUP_CONTAINER_RESOURCES:
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
@@ -512,20 +516,21 @@ public class ResourceLocalizationService extends CompositeService
localizerTracker.endContainerLocalization(locId);
}
- private void handleCacheCleanup(LocalizationEvent event) {
- ResourceRetentionSet retain =
- new ResourceRetentionSet(delService, cacheTargetSize);
- retain.addResources(publicRsrc);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Resource cleanup (public) " + retain);
- }
+ @VisibleForTesting
+ LocalCacheCleanerStats handleCacheCleanup() {
+ LocalCacheCleaner cleaner =
+ new LocalCacheCleaner(delService, cacheTargetSize);
+ cleaner.addResources(publicRsrc);
for (LocalResourcesTracker t : privateRsrc.values()) {
- retain.addResources(t);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
- }
+ cleaner.addResources(t);
+ }
+ LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stats.toStringDetailed());
+ } else if (LOG.isInfoEnabled()) {
+ LOG.info(stats.toString());
}
- //TODO Check if appRsrcs should also be added to the retention set.
+ return stats;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
deleted file mode 100644
index 447a792..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
-public class ResourceRetentionSet {
-
- private long delSize;
- private long currentSize;
- private final long targetSize;
- private final DeletionService delService;
- private final SortedMap<LocalizedResource,LocalResourcesTracker> retain;
-
- ResourceRetentionSet(DeletionService delService, long targetSize) {
- this(delService, targetSize, new LRUComparator());
- }
-
- ResourceRetentionSet(DeletionService delService, long targetSize,
- Comparator<? super LocalizedResource> cmp) {
- this(delService, targetSize,
- new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp));
- }
-
- ResourceRetentionSet(DeletionService delService, long targetSize,
- SortedMap<LocalizedResource,LocalResourcesTracker> retain) {
- this.retain = retain;
- this.delService = delService;
- this.targetSize = targetSize;
- }
-
- public void addResources(LocalResourcesTracker newTracker) {
- for (LocalizedResource resource : newTracker) {
- currentSize += resource.getSize();
- if (resource.getRefCount() > 0) {
- // always retain resources in use
- continue;
- }
- retain.put(resource, newTracker);
- }
- for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
- retain.entrySet().iterator();
- currentSize - delSize > targetSize && i.hasNext();) {
- Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
- LocalizedResource resource = rsrc.getKey();
- LocalResourcesTracker tracker = rsrc.getValue();
- if (tracker.remove(resource, delService)) {
- delSize += resource.getSize();
- i.remove();
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Cache: ").append(currentSize).append(", ");
- sb.append("Deleted: ").append(delSize);
- return sb.toString();
- }
-
- static class LRUComparator implements Comparator<LocalizedResource> {
- public int compare(LocalizedResource r1, LocalizedResource r2) {
- long ret = r1.getTimestamp() - r2.getTimestamp();
- if (0 == ret) {
- return System.identityHashCode(r1) - System.identityHashCode(r2);
- }
- return ret > 0 ? 1 : -1;
- }
- public boolean equals(Object other) {
- return this == other;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
new file mode 100644
index 0000000..d6db67a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
@@ -0,0 +1,235 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
+import org.junit.Test;
+
+/**
+ * This class tests the clean up of local caches the node manager uses for the
+ * purpose of resource localization.
+ */
+public class TestLocalCacheCleanup {
+
+ @Test
+ public void testBasicCleanup() {
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ addResource(publicRsrc, "/pub-resource1.txt", 5, 20, 0);
+ addResource(publicRsrc, "/pub-resource2.txt", 3, 20, 0);
+ addResource(publicRsrc, "/pub-resource3.txt", 15, 20, 0);
+
+ ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
+ new ConcurrentHashMap<String, LocalResourcesTracker>();
+
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ addResource(user1rsrcs, "/private-u1-resource4.txt", 1, 20, 0);
+ LocalResourcesTracker user1Tracker =
+ new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
+ privateRsrc.put("user1", user1Tracker);
+
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ addResource(user2rsrcs, "/private-u2-resource5.txt", 2, 20, 0);
+ LocalResourcesTracker user2Tracker =
+ new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
+ privateRsrc.put("user2", user2Tracker);
+
+ ResourceLocalizationService rls =
+ createLocService(publicRsrc, privateRsrc, 0);
+ LocalCacheCleanerStats stats = rls.handleCacheCleanup();
+ assertEquals(0, ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc)
+ .getLocalRsrc().size());
+ assertEquals(0,
+ ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
+ .getLocalRsrc().size());
+ assertEquals(0,
+ ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
+ .getLocalRsrc().size());
+ assertEquals(100, stats.getTotalDelSize());
+ assertEquals(60, stats.getPublicDelSize());
+ assertEquals(40, stats.getPrivateDelSize());
+ }
+
+ @Test
+ public void testPositiveRefCount() {
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ // Oldest resource with a positive ref count the other with a ref count
+ // equal to 0.
+ LocalResourceRequest survivor =
+ addResource(publicRsrc, "/pub-resource1.txt", 1, 20, 1);
+ addResource(publicRsrc, "/pub-resource2.txt", 5, 20, 0);
+
+ ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
+ new ConcurrentHashMap<String, LocalResourcesTracker>();
+
+ ResourceLocalizationService rls =
+ createLocService(publicRsrc, privateRsrc, 0);
+ LocalCacheCleanerStats stats = rls.handleCacheCleanup();
+ StubbedLocalResourcesTrackerImpl resources =
+ (StubbedLocalResourcesTrackerImpl) rls.publicRsrc;
+ assertEquals(1, resources.getLocalRsrc().size());
+ assertTrue(resources.getLocalRsrc().containsKey(survivor));
+ assertEquals(20, stats.getTotalDelSize());
+ assertEquals(20, stats.getPublicDelSize());
+ assertEquals(0, stats.getPrivateDelSize());
+ }
+
+ @Test
+ public void testLRUAcrossTrackers() {
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ LocalResourceRequest pubSurviver1 =
+ addResource(publicRsrc, "/pub-resource1.txt", 8, 20, 0);
+ LocalResourceRequest pubSurviver2 =
+ addResource(publicRsrc, "/pub-resource2.txt", 7, 20, 0);
+ addResource(publicRsrc, "/pub-resource3.txt", 1, 20, 0);
+
+ ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
+ new ConcurrentHashMap<String, LocalResourcesTracker>();
+
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ LocalResourceRequest usr1Surviver1 =
+ addResource(user1rsrcs, "/private-u1-resource1.txt", 6, 20, 0);
+ addResource(user1rsrcs, "/private-u1-resource2.txt", 2, 20, 0);
+ LocalResourcesTracker user1Tracker =
+ new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
+ privateRsrc.put("user1", user1Tracker);
+
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ LocalResourceRequest usr2Surviver1 =
+ addResource(user2rsrcs, "/private-u2-resource1.txt", 5, 20, 0);
+ addResource(user2rsrcs, "/private-u2-resource2.txt", 3, 20, 0);
+ addResource(user2rsrcs, "/private-u2-resource3.txt", 4, 20, 0);
+ LocalResourcesTracker user2Tracker =
+ new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
+ privateRsrc.put("user2", user2Tracker);
+
+ ResourceLocalizationService rls =
+ createLocService(publicRsrc, privateRsrc, 80);
+ LocalCacheCleanerStats stats = rls.handleCacheCleanup();
+
+ Map<LocalResourceRequest, LocalizedResource> pubLocalRsrc =
+ ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc).getLocalRsrc();
+ assertEquals(2, pubLocalRsrc.size());
+ assertTrue(pubLocalRsrc.containsKey(pubSurviver1));
+ assertTrue(pubLocalRsrc.containsKey(pubSurviver2));
+
+ Map<LocalResourceRequest, LocalizedResource> usr1LocalRsrc =
+ ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
+ .getLocalRsrc();
+ assertEquals(1, usr1LocalRsrc.size());
+ assertTrue(usr1LocalRsrc.containsKey(usr1Surviver1));
+
+ Map<LocalResourceRequest, LocalizedResource> usr2LocalRsrc =
+ ((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
+ .getLocalRsrc();
+ assertEquals(1, usr2LocalRsrc.size());
+ assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
+
+ assertEquals(80, stats.getTotalDelSize());
+ assertEquals(20, stats.getPublicDelSize());
+ assertEquals(60, stats.getPrivateDelSize());
+ }
+
+ private ResourceLocalizationService createLocService(
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrcs,
+ ConcurrentMap<String, LocalResourcesTracker> privateRsrcs,
+ long targetCacheSize) {
+ Context mockedContext = mock(Context.class);
+ when(mockedContext.getNMStateStore()).thenReturn(null);
+ ResourceLocalizationService rls =
+ new ResourceLocalizationService(null, null, null, null, mockedContext);
+ // We set the following members directly so we don't have to deal with
+ // mocking out the service init method.
+ rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);
+ rls.cacheTargetSize = targetCacheSize;
+ rls.privateRsrc.putAll(privateRsrcs);
+ return rls;
+ }
+
+ private LocalResourceRequest addResource(
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> resources,
+ String path, long timestamp, long size, int refCount) {
+ LocalResourceRequest request = createLocalResourceRequest(path, timestamp);
+ LocalizedResource resource =
+ createLocalizedResource(size, refCount, timestamp, request);
+ resources.put(request, resource);
+ return request;
+ }
+
+ private LocalResourceRequest createLocalResourceRequest(String path,
+ long timestamp) {
+ return new LocalResourceRequest(new Path(path), timestamp,
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
+ }
+
+ private LocalizedResource createLocalizedResource(long size, int refCount,
+ long timestamp, LocalResourceRequest req) {
+ LocalizedResource lr = mock(LocalizedResource.class);
+ when(lr.getSize()).thenReturn(size);
+ when(lr.getRefCount()).thenReturn(refCount);
+ when(lr.getTimestamp()).thenReturn(timestamp);
+ when(lr.getState()).thenReturn(ResourceState.LOCALIZED);
+ when(lr.getRequest()).thenReturn(req);
+ return lr;
+ }
+
+ class StubbedLocalResourcesTrackerImpl extends LocalResourcesTrackerImpl {
+ StubbedLocalResourcesTrackerImpl(String user,
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> rsrcs) {
+ super(user, null, null, rsrcs, false, new Configuration(), null, null);
+ }
+
+ @Override
+ public boolean remove(LocalizedResource rem, DeletionService delService) {
+ LocalizedResource r = localrsrc.remove(rem.getRequest());
+ if (r != null) {
+ LOG.info("Removed " + rem.getRequest().getPath()
+ + " from localized cache");
+ return true;
+ }
+ return false;
+ }
+
+ Map<LocalResourceRequest, LocalizedResource> getLocalRsrc() {
+ return Collections.unmodifiableMap(localrsrc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b79c417/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
deleted file mode 100644
index 81e69e2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.mockito.ArgumentCaptor;
-
-import static org.mockito.Mockito.*;
-
-public class TestResourceRetention {
-
- @Test
- public void testRsrcUnused() {
- DeletionService delService = mock(DeletionService.class);
- long TARGET_MB = 10 << 20;
- ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB);
- // 3MB files @{10, 15}
- LocalResourcesTracker pubTracker =
- createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5);
- // 1MB files @{3, 6, 9, 12}
- LocalResourcesTracker trackerA =
- createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3);
- // 4MB file @{1}
- LocalResourcesTracker trackerB =
- createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5);
- // 2MB files @{7, 9, 11}
- LocalResourcesTracker trackerC =
- createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2);
- // Total cache: 20MB; verify removed at least 10MB
- rss.addResources(pubTracker);
- rss.addResources(trackerA);
- rss.addResources(trackerB);
- rss.addResources(trackerC);
- long deleted = 0L;
- ArgumentCaptor<LocalizedResource> captor =
- ArgumentCaptor.forClass(LocalizedResource.class);
- verify(pubTracker, atMost(2))
- .remove(captor.capture(), isA(DeletionService.class));
- verify(trackerA, atMost(4))
- .remove(captor.capture(), isA(DeletionService.class));
- verify(trackerB, atMost(1))
- .remove(captor.capture(), isA(DeletionService.class));
- verify(trackerC, atMost(3))
- .remove(captor.capture(), isA(DeletionService.class));
- for (LocalizedResource rem : captor.getAllValues()) {
- deleted += rem.getSize();
- }
- assertTrue(deleted >= 10 * 1024 * 1024);
- assertTrue(deleted < 15 * 1024 * 1024);
- }
-
- LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
- long nRsrcs, long timestamp, long tsstep) {
- Configuration conf = new Configuration();
- ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
- new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
- LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
- null, trackerResources, false, conf, new NMNullStateStoreService(),null));
- for (int i = 0; i < nRsrcs; ++i) {
- final LocalResourceRequest req = new LocalResourceRequest(
- new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
- LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
- final long ts = timestamp + i * tsstep;
- final Path p = new Path("file:///local/" + user + "/rsrc" + i);
- LocalizedResource rsrc = new LocalizedResource(req, null) {
- @Override public int getRefCount() { return 0; }
- @Override public long getSize() { return rsrcSize; }
- @Override public Path getLocalPath() { return p; }
- @Override public long getTimestamp() { return ts; }
- @Override
- public ResourceState getState() { return ResourceState.LOCALIZED; }
- };
- trackerResources.put(req, rsrc);
- }
- return ret;
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org