You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/10/25 20:49:48 UTC
git commit: YARN-2183. [YARN-1492] Cleaner service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk f44cf9959 -> c51e53d7a
YARN-2183. [YARN-1492] Cleaner service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c51e53d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c51e53d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c51e53d7
Branch: refs/heads/trunk
Commit: c51e53d7aad46059f52d4046a5fedfdfd3c37955
Parents: f44cf99
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sat Oct 25 10:31:06 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Sat Oct 25 10:31:06 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 47 ++-
.../src/main/resources/yarn-default.xml | 27 +-
.../server/sharedcache/SharedCacheUtil.java | 10 +
.../sharedcachemanager/CleanerService.java | 218 +++++++++++++
.../server/sharedcachemanager/CleanerTask.java | 308 +++++++++++++++++++
.../sharedcachemanager/SharedCacheManager.java | 7 +
.../metrics/CleanerMetrics.java | 172 +++++++++++
.../store/InMemorySCMStore.java | 45 ++-
.../sharedcachemanager/store/SCMStore.java | 36 ++-
.../sharedcachemanager/TestCleanerTask.java | 152 +++++++++
.../metrics/TestCleanerMetrics.java | 65 ++++
.../store/TestInMemorySCMStore.java | 4 +-
13 files changed, 1054 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0b620ec..19e82c1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -39,6 +39,9 @@ Release 2.7.0 - UNRELEASED
YARN-2180. [YARN-1492] In-memory backing store for cache manager.
(Chris Trezzo via kasha)
+ YARN-2183. [YARN-1492] Cleaner service for cache manager.
+ (Chris Trezzo and Sangjin Lee via kasha)
+
IMPROVEMENTS
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 2d08fde..d552e9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1393,25 +1393,54 @@ public class YarnConfiguration extends Configuration {
* the last reference exceeds the staleness period. This value is specified in
* minutes.
*/
- public static final String IN_MEMORY_STALENESS_PERIOD =
- IN_MEMORY_STORE_PREFIX + "staleness-period";
- public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
+ public static final String IN_MEMORY_STALENESS_PERIOD_MINS =
+ IN_MEMORY_STORE_PREFIX + "staleness-period-mins";
+ public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS =
+ 7 * 24 * 60;
/**
* Initial delay before the in-memory store runs its first check to remove
* dead initial applications. Specified in minutes.
*/
- public static final String IN_MEMORY_INITIAL_DELAY =
- IN_MEMORY_STORE_PREFIX + "initial-delay";
- public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
+ public static final String IN_MEMORY_INITIAL_DELAY_MINS =
+ IN_MEMORY_STORE_PREFIX + "initial-delay-mins";
+ public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10;
/**
* The frequency at which the in-memory store checks to remove dead initial
* applications. Specified in minutes.
*/
- public static final String IN_MEMORY_CHECK_PERIOD =
- IN_MEMORY_STORE_PREFIX + "check-period";
- public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+ public static final String IN_MEMORY_CHECK_PERIOD_MINS =
+ IN_MEMORY_STORE_PREFIX + "check-period-mins";
+ public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS = 12 * 60;
+
+ // SCM Cleaner service configuration
+
+ private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX
+ + "cleaner.";
+
+ /**
+ * The frequency at which a cleaner task runs. Specified in minutes.
+ */
+ public static final String SCM_CLEANER_PERIOD_MINS =
+ SCM_CLEANER_PREFIX + "period-mins";
+ public static final int DEFAULT_SCM_CLEANER_PERIOD_MINS = 24 * 60;
+
+ /**
+ * Initial delay before the first cleaner task is scheduled. Specified in
+ * minutes.
+ */
+ public static final String SCM_CLEANER_INITIAL_DELAY_MINS =
+ SCM_CLEANER_PREFIX + "initial-delay-mins";
+ public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS = 10;
+
+ /**
+ * The time to sleep between processing each shared cache resource. Specified
+ * in milliseconds.
+ */
+ public static final String SCM_CLEANER_RESOURCE_SLEEP_MS =
+ SCM_CLEANER_PREFIX + "resource-sleep-ms";
+ public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
////////////////////////////////
// Other Configs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a64ed73..e364ba8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1380,24 +1380,45 @@
<description>A resource in the in-memory store is considered stale
if the time since the last reference exceeds the staleness period.
This value is specified in minutes.</description>
- <name>yarn.sharedcache.store.in-memory.staleness-period</name>
+ <name>yarn.sharedcache.store.in-memory.staleness-period-mins</name>
<value>10080</value>
</property>
<property>
<description>Initial delay before the in-memory store runs its first check
to remove dead initial applications. Specified in minutes.</description>
- <name>yarn.sharedcache.store.in-memory.initial-delay</name>
+ <name>yarn.sharedcache.store.in-memory.initial-delay-mins</name>
<value>10</value>
</property>
<property>
<description>The frequency at which the in-memory store checks to remove
dead initial applications. Specified in minutes.</description>
- <name>yarn.sharedcache.store.in-memory.check-period</name>
+ <name>yarn.sharedcache.store.in-memory.check-period-mins</name>
<value>720</value>
</property>
+ <property>
+ <description>The frequency at which a cleaner task runs.
+ Specified in minutes.</description>
+ <name>yarn.sharedcache.cleaner.period-mins</name>
+ <value>1440</value>
+ </property>
+
+ <property>
+ <description>Initial delay before the first cleaner task is scheduled.
+ Specified in minutes.</description>
+ <name>yarn.sharedcache.cleaner.initial-delay-mins</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>The time to sleep between processing each shared cache
+ resource. Specified in milliseconds.</description>
+ <name>yarn.sharedcache.cleaner.resource-sleep-ms</name>
+ <value>0</value>
+ </property>
+
<!-- Other configuration -->
<property>
<description>The interval that the yarn client library uses to poll the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
index 4b933ac..d3cf379 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
@@ -78,4 +78,14 @@ public class SharedCacheUtil {
return sb.toString();
}
+
+ @Private
+ public static String getCacheEntryGlobPattern(int depth) {
+ StringBuilder pattern = new StringBuilder();
+ for (int i = 0; i < depth; i++) {
+ pattern.append("*/");
+ }
+ pattern.append("*");
+ return pattern.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
new file mode 100644
index 0000000..593be4a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cleaner service that maintains the shared cache area, and cleans up stale
+ * entries on a regular basis.
+ */
+@Private
+@Evolving
+public class CleanerService extends CompositeService {
+ /**
+ * The name of the global cleaner lock that the cleaner creates to indicate
+ * that a cleaning process is in progress.
+ */
+ public static final String GLOBAL_CLEANER_PID = ".cleaner_pid";
+
+ private static final Log LOG = LogFactory.getLog(CleanerService.class);
+
+ private Configuration conf;
+ private CleanerMetrics metrics;
+ private ScheduledExecutorService scheduledExecutor;
+ private final SCMStore store;
+ private final Lock cleanerTaskLock;
+
+ public CleanerService(SCMStore store) {
+ super("CleanerService");
+ this.store = store;
+ this.cleanerTaskLock = new ReentrantLock();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+
+ // create scheduler executor service that services the cleaner tasks
+ // use 2 threads to accommodate the on-demand tasks and reduce the chance of
+ // back-to-back runs
+ ThreadFactory tf =
+ new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
+ scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (!writeGlobalCleanerPidFile()) {
+ throw new YarnException("The global cleaner pid file already exists! " +
+ "It appears there is another CleanerService running in the cluster");
+ }
+
+ this.metrics = CleanerMetrics.initSingleton(conf);
+
+ // Start dependent services (i.e. AppChecker)
+ super.serviceStart();
+
+ Runnable task =
+ CleanerTask.create(conf, store, metrics, cleanerTaskLock);
+ long periodInMinutes = getPeriod(conf);
+ scheduledExecutor.scheduleAtFixedRate(task, getInitialDelay(conf),
+ periodInMinutes, TimeUnit.MINUTES);
+ LOG.info("Scheduled the shared cache cleaner task to run every "
+ + periodInMinutes + " minutes.");
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Shutting down the background thread.");
+ scheduledExecutor.shutdownNow();
+ try {
+ if (scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.info("The background thread stopped.");
+ } else {
+ LOG.warn("Gave up waiting for the cleaner task to shutdown.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("The cleaner service was interrupted while shutting down the task.",
+ e);
+ }
+
+ removeGlobalCleanerPidFile();
+
+ super.serviceStop();
+ }
+
+ /**
+ * Execute an on-demand cleaner task.
+ */
+ protected void runCleanerTask() {
+ Runnable task =
+ CleanerTask.create(conf, store, metrics, cleanerTaskLock);
+ // this is a non-blocking call (it simply submits the task to the executor
+ // queue and returns)
+ this.scheduledExecutor.execute(task);
+ }
+
+ /**
+ * To ensure there are not multiple instances of the SCM running on a given
+ * cluster, a global pid file is used. This file contains the hostname of the
+ * machine that owns the pid file.
+ *
+ * @return true if the pid file was written, false otherwise
+ * @throws YarnException
+ */
+ private boolean writeGlobalCleanerPidFile() throws YarnException {
+ String root =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+ try {
+ FileSystem fs = FileSystem.get(this.conf);
+
+ if (fs.exists(pidPath)) {
+ return false;
+ }
+
+ FSDataOutputStream os = fs.create(pidPath, false);
+ // write the hostname and the process id in the global cleaner pid file
+ final String ID = ManagementFactory.getRuntimeMXBean().getName();
+ os.writeUTF(ID);
+ os.close();
+ // add it to the delete-on-exit to ensure it gets deleted when the JVM
+ // exits
+ fs.deleteOnExit(pidPath);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ LOG.info("Created the global cleaner pid file at " + pidPath.toString());
+ return true;
+ }
+
+ private void removeGlobalCleanerPidFile() {
+ try {
+ FileSystem fs = FileSystem.get(this.conf);
+ String root =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+ Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+
+
+ fs.delete(pidPath, false);
+ LOG.info("Removed the global cleaner pid file at " + pidPath.toString());
+ } catch (IOException e) {
+ LOG.error(
+ "Unable to remove the global cleaner pid file! The file may need "
+ + "to be removed manually.", e);
+ }
+ }
+
+ private static int getInitialDelay(Configuration conf) {
+ int initialDelayInMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY_MINS,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS);
+ // negative value is invalid; use the default
+ if (initialDelayInMinutes < 0) {
+ throw new HadoopIllegalArgumentException("Negative initial delay value: "
+ + initialDelayInMinutes
+ + ". The initial delay must be greater than zero.");
+ }
+ return initialDelayInMinutes;
+ }
+
+ private static int getPeriod(Configuration conf) {
+ int periodInMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD_MINS,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD_MINS);
+ // non-positive value is invalid; use the default
+ if (periodInMinutes <= 0) {
+ throw new HadoopIllegalArgumentException("Non-positive period value: "
+ + periodInMinutes
+ + ". The cleaner period must be greater than or equal to zero.");
+ }
+ return periodInMinutes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
new file mode 100644
index 0000000..a7fdcbd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+/**
+ * The task that runs and cleans up the shared cache area for stale entries and
+ * orphaned files. It is expected that only one cleaner task runs at any given
+ * point in time.
+ */
+@Private
+@Evolving
+class CleanerTask implements Runnable {
+ private static final String RENAMED_SUFFIX = "-renamed";
+ private static final Log LOG = LogFactory.getLog(CleanerTask.class);
+
+ private final String location;
+ private final long sleepTime;
+ private final int nestedLevel;
+ private final Path root;
+ private final FileSystem fs;
+ private final SCMStore store;
+ private final CleanerMetrics metrics;
+ private final Lock cleanerTaskLock;
+
+ /**
+ * Creates a cleaner task based on the configuration. This is provided for
+ * convenience.
+ *
+ * @param conf
+ * @param store
+ * @param metrics
+ * @param cleanerTaskLock lock that ensures a serial execution of cleaner
+ * task
+ * @return an instance of a CleanerTask
+ */
+ public static CleanerTask create(Configuration conf, SCMStore store,
+ CleanerMetrics metrics, Lock cleanerTaskLock) {
+ try {
+ // get the root directory for the shared cache
+ String location =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+ long sleepTime =
+ conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP_MS,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS);
+ int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+ FileSystem fs = FileSystem.get(conf);
+
+ return new CleanerTask(location, sleepTime, nestedLevel, fs, store,
+ metrics, cleanerTaskLock);
+ } catch (IOException e) {
+ LOG.error("Unable to obtain the filesystem for the cleaner service", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Creates a cleaner task based on the root directory location and the
+ * filesystem.
+ */
+ CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs,
+ SCMStore store, CleanerMetrics metrics, Lock cleanerTaskLock) {
+ this.location = location;
+ this.sleepTime = sleepTime;
+ this.nestedLevel = nestedLevel;
+ this.root = new Path(location);
+ this.fs = fs;
+ this.store = store;
+ this.metrics = metrics;
+ this.cleanerTaskLock = cleanerTaskLock;
+ }
+
+ @Override
+ public void run() {
+ if (!this.cleanerTaskLock.tryLock()) {
+ // there is already another task running
+ LOG.warn("A cleaner task is already running. "
+ + "This scheduled cleaner task will do nothing.");
+ return;
+ }
+
+ try {
+ if (!fs.exists(root)) {
+ LOG.error("The shared cache root " + location + " was not found. "
+ + "The cleaner task will do nothing.");
+ return;
+ }
+
+ // we're now ready to process the shared cache area
+ process();
+ } catch (Throwable e) {
+ LOG.error("Unexpected exception while initializing the cleaner task. "
+ + "This task will do nothing,", e);
+ } finally {
+ // this is set to false regardless of if it is a scheduled or on-demand
+ // task
+ this.cleanerTaskLock.unlock();
+ }
+ }
+
+ /**
+ * Sweeps and processes the shared cache area to clean up stale and orphaned
+ * files.
+ */
+ void process() {
+ // mark the beginning of the run in the metrics
+ metrics.reportCleaningStart();
+ try {
+ // now traverse individual directories and process them
+ // the directory structure is specified by the nested level parameter
+ // (e.g. 9/c/d/<checksum>)
+ String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel);
+ FileStatus[] resources =
+ fs.globStatus(new Path(root, pattern));
+ int numResources = resources == null ? 0 : resources.length;
+ LOG.info("Processing " + numResources + " resources in the shared cache");
+ long beginMs = System.currentTimeMillis();
+ if (resources != null) {
+ for (FileStatus resource : resources) {
+ // check for interruption so it can abort in a timely manner in case
+ // of shutdown
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("The cleaner task was interrupted. Aborting.");
+ break;
+ }
+
+ if (resource.isDirectory()) {
+ processSingleResource(resource);
+ } else {
+ LOG.warn("Invalid file at path " + resource.getPath().toString()
+ +
+ " when a directory was expected");
+ }
+ // add sleep time between cleaning each directory if it is non-zero
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ }
+ }
+ long endMs = System.currentTimeMillis();
+ long durationMs = endMs - beginMs;
+ LOG.info("Processed " + numResources + " resource(s) in " + durationMs +
+ " ms.");
+ } catch (IOException e1) {
+ LOG.error("Unable to complete the cleaner task", e1);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt(); // restore the interrupt
+ }
+ }
+
+ /**
+ * Returns a path for the root directory for the shared cache.
+ */
+ Path getRootPath() {
+ return root;
+ }
+
+ /**
+ * Processes a single shared cache resource directory.
+ */
+ void processSingleResource(FileStatus resource) {
+ Path path = resource.getPath();
+ // indicates the processing status of the resource
+ ResourceStatus resourceStatus = ResourceStatus.INIT;
+
+ // first, if the path ends with the renamed suffix, it indicates the
+ // directory was moved (as stale) but somehow not deleted (probably due to
+ // SCM failure); delete the directory
+ if (path.toString().endsWith(RENAMED_SUFFIX)) {
+ LOG.info("Found a renamed directory that was left undeleted at " +
+ path.toString() + ". Deleting.");
+ try {
+ if (fs.delete(path, true)) {
+ resourceStatus = ResourceStatus.DELETED;
+ }
+ } catch (IOException e) {
+ LOG.error("Error while processing a shared cache resource: " + path, e);
+ }
+ } else {
+ // this is the path to the cache resource directory
+ // the directory name is the resource key (i.e. a unique identifier)
+ String key = path.getName();
+
+ try {
+ store.cleanResourceReferences(key);
+ } catch (YarnException e) {
+ LOG.error("Exception thrown while removing dead appIds.", e);
+ }
+
+ if (store.isResourceEvictable(key, resource)) {
+ try {
+ /*
+ * TODO See YARN-2663: There is a race condition between
+ * store.removeResource(key) and
+ * removeResourceFromCacheFileSystem(path) operations because they do
+ * not happen atomically and resources can be uploaded with different
+ * file names by the node managers.
+ */
+ // remove the resource from scm (checks for appIds as well)
+ if (store.removeResource(key)) {
+ // remove the resource from the file system
+ boolean deleted = removeResourceFromCacheFileSystem(path);
+ if (deleted) {
+ resourceStatus = ResourceStatus.DELETED;
+ } else {
+ LOG.error("Failed to remove path from the file system."
+ + " Skipping this resource: " + path);
+ resourceStatus = ResourceStatus.ERROR;
+ }
+ } else {
+ // we did not delete the resource because it contained application
+ // ids
+ resourceStatus = ResourceStatus.PROCESSED;
+ }
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to remove path from the file system. Skipping this resource: "
+ + path, e);
+ resourceStatus = ResourceStatus.ERROR;
+ }
+ } else {
+ resourceStatus = ResourceStatus.PROCESSED;
+ }
+ }
+
+ // record the processing
+ switch (resourceStatus) {
+ case DELETED:
+ metrics.reportAFileDelete();
+ break;
+ case PROCESSED:
+ metrics.reportAFileProcess();
+ break;
+ case ERROR:
+ metrics.reportAFileError();
+ break;
+ default:
+ LOG.error("Cleaner encountered an invalid status (" + resourceStatus
+ + ") while processing resource: " + path.getName());
+ }
+ }
+
+ private boolean removeResourceFromCacheFileSystem(Path path)
+ throws IOException {
+ // rename the directory to make the delete atomic
+ Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX);
+ if (fs.rename(path, renamedPath)) {
+ // the directory can be removed safely now
+ // log the original path
+ LOG.info("Deleting " + path.toString());
+ return fs.delete(renamedPath, true);
+ } else {
+ // we were unable to remove it for some reason: it's best to leave
+ // it at that
+ LOG.error("We were not able to rename the directory to "
+ + renamedPath.toString() + ". We will leave it intact.");
+ }
+ return false;
+ }
+
+ /**
+ * A status indicating what happened with the processing of a given cache
+ * resource.
+ */
+ private enum ResourceStatus {
+ INIT,
+ /** Resource was successfully processed, but not deleted **/
+ PROCESSED,
+ /** Resource was successfully deleted **/
+ DELETED,
+ /** The cleaner task ran into an error while processing the resource **/
+ ERROR
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 2f3ddb1..3fdb588 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -64,6 +64,9 @@ public class SharedCacheManager extends CompositeService {
this.store = createSCMStoreService(conf);
addService(store);
+ CleanerService cs = createCleanerService(store);
+ addService(cs);
+
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -90,6 +93,10 @@ public class SharedCacheManager extends CompositeService {
return store;
}
+ private CleanerService createCleanerService(SCMStore store) {
+ return new CleanerService(store);
+ }
+
@Override
protected void serviceStop() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
new file mode 100644
index 0000000..5c8ea3d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * This class is for maintaining the various Cleaner activity statistics and
+ * publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn")
+public class CleanerMetrics {
+ public static final Log LOG = LogFactory.getLog(CleanerMetrics.class);
+ private final MetricsRegistry registry = new MetricsRegistry("cleaner");
+
+ enum Singleton {
+ INSTANCE;
+
+ CleanerMetrics impl;
+
+ synchronized CleanerMetrics init(Configuration conf) {
+ if (impl == null) {
+ impl = create(conf);
+ }
+ return impl;
+ }
+ }
+
+ public static CleanerMetrics initSingleton(Configuration conf) {
+ return Singleton.INSTANCE.init(conf);
+ }
+
+ public static CleanerMetrics getInstance() {
+ CleanerMetrics topMetrics = Singleton.INSTANCE.impl;
+ if (topMetrics == null)
+ throw new IllegalStateException(
+ "The CleanerMetics singlton instance is not initialized."
+ + " Have you called init first?");
+ return topMetrics;
+ }
+
+ @Metric("number of deleted files over all runs")
+ private MutableCounterLong totalDeletedFiles;
+
+ public long getTotalDeletedFiles() {
+ return totalDeletedFiles.value();
+ }
+
+ private @Metric("number of deleted files in the last run")
+ MutableGaugeLong deletedFiles;
+
+ public long getDeletedFiles() {
+ return deletedFiles.value();
+ }
+
+ private @Metric("number of processed files over all runs")
+ MutableCounterLong totalProcessedFiles;
+
+ public long getTotalProcessedFiles() {
+ return totalProcessedFiles.value();
+ }
+
+ private @Metric("number of processed files in the last run")
+ MutableGaugeLong processedFiles;
+
+ public long getProcessedFiles() {
+ return processedFiles.value();
+ }
+
+ @Metric("number of file errors over all runs")
+ private MutableCounterLong totalFileErrors;
+
+ public long getTotalFileErrors() {
+ return totalFileErrors.value();
+ }
+
+ private @Metric("number of file errors in the last run")
+ MutableGaugeLong fileErrors;
+
+ public long getFileErrors() {
+ return fileErrors.value();
+ }
+
+ private CleanerMetrics() {
+ }
+
+ /**
+ * The metric source obtained after parsing the annotations
+ */
+ MetricsSource metricSource;
+
+ static CleanerMetrics create(Configuration conf) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+
+ CleanerMetrics metricObject = new CleanerMetrics();
+ MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject);
+ final MetricsSource s = sb.build();
+ ms.register("cleaner", "The cleaner service of truly shared cache", s);
+ metricObject.metricSource = s;
+ return metricObject;
+ }
+
+ /**
+ * Report a delete operation at the current system time
+ */
+ public void reportAFileDelete() {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ totalDeletedFiles.incr();
+ deletedFiles.incr();
+ }
+
+ /**
+ * Report a process operation at the current system time
+ */
+ public void reportAFileProcess() {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ }
+
+ /**
+ * Report a process operation error at the current system time
+ */
+ public void reportAFileError() {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ totalFileErrors.incr();
+ fileErrors.incr();
+ }
+
+ /**
+ * Report the start a new run of the cleaner.
+ *
+ */
+ public void reportCleaningStart() {
+ processedFiles.set(0);
+ deletedFiles.set(0);
+ fileErrors.set(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
index 79369d8..b8fe14f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
-import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -83,13 +82,12 @@ public class InMemorySCMStore extends SCMStore {
private final Object initialAppsLock = new Object();
private long startTime;
private int stalenessMinutes;
- private AppChecker appChecker;
private ScheduledExecutorService scheduler;
private int initialDelayMin;
private int checkPeriodMin;
- public InMemorySCMStore() {
- super(InMemorySCMStore.class.getName());
+ public InMemorySCMStore(AppChecker appChecker) {
+ super(InMemorySCMStore.class.getName(), appChecker);
}
private String intern(String key) {
@@ -108,9 +106,6 @@ public class InMemorySCMStore extends SCMStore {
this.checkPeriodMin = getCheckPeriod(conf);
this.stalenessMinutes = getStalenessPeriod(conf);
- appChecker = createAppCheckerService(conf);
- addService(appChecker);
-
bootstrap(conf);
ThreadFactory tf =
@@ -157,11 +152,6 @@ public class InMemorySCMStore extends SCMStore {
super.serviceStop();
}
- @VisibleForTesting
- AppChecker createAppCheckerService(Configuration conf) {
- return SharedCacheManager.createAppCheckerService(conf);
- }
-
private void bootstrap(Configuration conf) throws IOException {
Map<String, String> initialCachedResources =
getInitialCachedResources(FileSystem.get(conf), conf);
@@ -201,14 +191,10 @@ public class InMemorySCMStore extends SCMStore {
// now traverse individual directories and process them
// the directory structure is specified by the nested level parameter
// (e.g. 9/c/d/<checksum>/file)
- StringBuilder pattern = new StringBuilder();
- for (int i = 0; i < nestedLevel + 1; i++) {
- pattern.append("*/");
- }
- pattern.append("*");
+ String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1);
LOG.info("Querying for all individual cached resource files");
- FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+ FileStatus[] entries = fs.globStatus(new Path(root, pattern));
int numEntries = entries == null ? 0 : entries.length;
LOG.info("Found " + numEntries + " files: processing for one resource per "
+ "key");
@@ -360,6 +346,17 @@ public class InMemorySCMStore extends SCMStore {
}
/**
+ * Provides atomicity for the method.
+ */
+ @Override
+ public void cleanResourceReferences(String key) throws YarnException {
+ String interned = intern(key);
+ synchronized (interned) {
+ super.cleanResourceReferences(key);
+ }
+ }
+
+ /**
* Removes the given resource from the store. Returns true if the resource is
* found and removed or if the resource is not found. Returns false if it was
* unable to remove the resource because the resource reference list was not
@@ -427,8 +424,8 @@ public class InMemorySCMStore extends SCMStore {
private static int getStalenessPeriod(Configuration conf) {
int stalenessMinutes =
- conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
- YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
+ conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS,
+ YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS);
// non-positive value is invalid; use the default
if (stalenessMinutes <= 0) {
throw new HadoopIllegalArgumentException("Non-positive staleness value: "
@@ -440,8 +437,8 @@ public class InMemorySCMStore extends SCMStore {
private static int getInitialDelay(Configuration conf) {
int initialMinutes =
- conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
- YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
+ conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS,
+ YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS);
// non-positive value is invalid; use the default
if (initialMinutes <= 0) {
throw new HadoopIllegalArgumentException(
@@ -453,8 +450,8 @@ public class InMemorySCMStore extends SCMStore {
private static int getCheckPeriod(Configuration conf) {
int checkMinutes =
- conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
- YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
+ conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS,
+ YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS);
// non-positive value is invalid; use the default
if (checkMinutes <= 0) {
throw new HadoopIllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
index 397d904..6be00b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
@@ -19,11 +19,15 @@
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
/**
@@ -35,8 +39,11 @@ import org.apache.hadoop.service.CompositeService;
@Evolving
public abstract class SCMStore extends CompositeService {
- protected SCMStore(String name) {
+ protected final AppChecker appChecker;
+
+ protected SCMStore(String name, AppChecker appChecker) {
super(name);
+ this.appChecker = appChecker;
}
/**
@@ -119,6 +126,33 @@ public abstract class SCMStore extends CompositeService {
Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
/**
+ * Clean all resource references to a cache resource that contain application
+ * ids pointing to finished applications. If the resource key does not exist,
+ * do nothing.
+ *
+ * @param key a unique identifier for a resource
+ * @throws YarnException
+ */
+ @Private
+ public void cleanResourceReferences(String key) throws YarnException {
+ Collection<SharedCacheResourceReference> refs = getResourceReferences(key);
+ if (!refs.isEmpty()) {
+ Set<SharedCacheResourceReference> refsToRemove =
+ new HashSet<SharedCacheResourceReference>();
+ for (SharedCacheResourceReference r : refs) {
+ if (!appChecker.isApplicationActive(r.getAppId())) {
+ // application in resource reference is dead, it is safe to remove the
+ // reference
+ refsToRemove.add(r);
+ }
+ }
+ if (refsToRemove.size() > 0) {
+ removeResourceReferences(key, refsToRemove, false);
+ }
+ }
+ }
+
+ /**
* Check if a specific resource is evictable according to the store's enabled
* cache eviction policies.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
new file mode 100644
index 0000000..421b5bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.junit.Test;
+
+public class TestCleanerTask {
+ private static final String ROOT =
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT;
+ private static final long SLEEP_TIME =
+ YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS;
+ private static final int NESTED_LEVEL =
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+
+ @Test
+ public void testNonExistentRoot() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, store, metrics, new ReentrantLock());
+ // the shared cache root does not exist
+ when(fs.exists(task.getRootPath())).thenReturn(false);
+
+ task.run();
+
+ // process() should not be called
+ verify(task, never()).process();
+ }
+
+ @Test
+ public void testProcessFreshResource() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+ // mock a resource that is not evictable
+ when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+ .thenReturn(false);
+ FileStatus status = mock(FileStatus.class);
+ when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+
+ // process the resource
+ task.processSingleResource(status);
+
+ // the directory should not be renamed
+ verify(fs, never()).rename(eq(status.getPath()), isA(Path.class));
+ // metrics should record a processed file (but not delete)
+ verify(metrics).reportAFileProcess();
+ verify(metrics, never()).reportAFileDelete();
+ }
+
+ @Test
+ public void testProcessEvictableResource() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+ // mock an evictable resource
+ when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+ .thenReturn(true);
+ FileStatus status = mock(FileStatus.class);
+ when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+ when(store.removeResource(isA(String.class))).thenReturn(true);
+ // rename succeeds
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ // delete returns true
+ when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true);
+
+ // process the resource
+ task.processSingleResource(status);
+
+ // the directory should be renamed
+ verify(fs).rename(eq(status.getPath()), isA(Path.class));
+ // metrics should record a deleted file
+ verify(metrics).reportAFileDelete();
+ verify(metrics, never()).reportAFileProcess();
+ }
+
+ private CleanerTask createSpiedTask(FileSystem fs, SCMStore store,
+ CleanerMetrics metrics, Lock isCleanerRunning) {
+ return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, store,
+ metrics, isCleanerRunning));
+ }
+
+ @Test
+ public void testResourceIsInUseHasAnActiveApp() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ FileStatus resource = mock(FileStatus.class);
+ when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+ // resource is stale
+ when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+ .thenReturn(true);
+ // but still has appIds
+ when(store.removeResource(isA(String.class))).thenReturn(false);
+
+ CleanerTask task =
+ createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+ // process the resource
+ task.processSingleResource(resource);
+
+ // metrics should record a processed file (but not delete)
+ verify(metrics).reportAFileProcess();
+ verify(metrics, never()).reportAFileDelete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
new file mode 100644
index 0000000..26ab179
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCleanerMetrics {
+
+ Configuration conf = new Configuration();
+ CleanerMetrics cleanerMetrics;
+
+ @Before
+ public void init() {
+ CleanerMetrics.initSingleton(conf);
+ cleanerMetrics = CleanerMetrics.getInstance();
+ }
+
+ @Test
+ public void testMetricsOverMultiplePeriods() {
+ simulateACleanerRun();
+ assertMetrics(4, 4, 1, 1);
+ simulateACleanerRun();
+ assertMetrics(4, 8, 1, 2);
+ }
+
+ public void simulateACleanerRun() {
+ cleanerMetrics.reportCleaningStart();
+ cleanerMetrics.reportAFileProcess();
+ cleanerMetrics.reportAFileDelete();
+ cleanerMetrics.reportAFileProcess();
+ cleanerMetrics.reportAFileProcess();
+ }
+
+ void assertMetrics(int proc, int totalProc, int del, int totalDel) {
+ assertEquals(
+ "Processed files in the last period are not measured correctly", proc,
+ cleanerMetrics.getProcessedFiles());
+ assertEquals("Total processed files are not measured correctly",
+ totalProc, cleanerMetrics.getTotalProcessedFiles());
+ assertEquals(
+ "Deleted files in the last period are not measured correctly", del,
+ cleanerMetrics.getDeletedFiles());
+ assertEquals("Total deleted files are not measured correctly",
+ totalDel, cleanerMetrics.getTotalDeletedFiles());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c51e53d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
index 891703e..831ef6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -60,10 +60,8 @@ public class TestInMemorySCMStore {
@Before
public void setup() {
- this.store = spy(new InMemorySCMStore());
this.checker = spy(new DummyAppChecker());
- doReturn(checker).when(store).createAppCheckerService(
- isA(Configuration.class));
+ this.store = spy(new InMemorySCMStore(checker));
}
@After