You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/10/10 02:16:29 UTC

git commit: YARN-2180. [YARN-1492] In-memory backing store for cache manager. (Chris Trezzo via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk d8d628d1e -> 4f426fe22


YARN-2180. [YARN-1492] In-memory backing store for cache manager. (Chris Trezzo via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f426fe2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f426fe2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f426fe2

Branch: refs/heads/trunk
Commit: 4f426fe2232ed90d8fdf8619fbdeae28d788b5c8
Parents: d8d628d
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Oct 9 17:16:06 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Oct 9 17:16:06 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  43 ++
 .../src/main/resources/yarn-default.xml         |  34 ++
 .../sharedcache/SharedCacheStructureUtil.java   |  81 ---
 .../server/sharedcache/SharedCacheUtil.java     |  81 +++
 .../pom.xml                                     |   4 +
 .../sharedcachemanager/SharedCacheManager.java  |  58 +++
 .../store/InMemorySCMStore.java                 | 514 +++++++++++++++++++
 .../sharedcachemanager/store/SCMStore.java      | 133 +++++
 .../store/SharedCacheResource.java              |  64 +++
 .../store/SharedCacheResourceReference.java     |  86 ++++
 .../store/TestInMemorySCMStore.java             | 334 ++++++++++++
 12 files changed, 1354 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 85a88c8..1633cf7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -36,6 +36,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2179. [YARN-1492] Initial cache manager structure and context. 
     (Chris Trezzo via kasha) 
 
+    YARN-2180. [YARN-1492] In-memory backing store for cache manager. 
+    (Chris Trezzo via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1a2aa1d..f183a90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1332,6 +1332,49 @@ public class YarnConfiguration extends Configuration {
       SHARED_CACHE_PREFIX + "nested-level";
   public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
   
+  // Shared Cache Manager Configs
+
+  public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store.";
+
+  public static final String SCM_STORE_CLASS = SCM_STORE_PREFIX + "class";
+  public static final String DEFAULT_SCM_STORE_CLASS =
+      "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore";
+
+  public static final String SCM_APP_CHECKER_CLASS = SHARED_CACHE_PREFIX
+      + "app-checker.class";
+  public static final String DEFAULT_SCM_APP_CHECKER_CLASS =
+      "org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker";
+
+  // In-memory SCM store configuration
+  
+  public static final String IN_MEMORY_STORE_PREFIX =
+      SHARED_CACHE_PREFIX + "in-memory.";
+
+  /**
+   * A resource in the InMemorySCMStore is considered stale if the time since
+   * the last reference exceeds the staleness period. This value is specified in
+   * minutes.
+   */
+  public static final String IN_MEMORY_STALENESS_PERIOD =
+      IN_MEMORY_STORE_PREFIX + "staleness-period";
+  public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
+
+  /**
+   * Initial delay before the in-memory store runs its first check to remove
+   * dead initial applications. Specified in minutes.
+   */
+  public static final String IN_MEMORY_INITIAL_DELAY =
+      IN_MEMORY_STORE_PREFIX + "initial-delay";
+  public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
+  
+  /**
+   * The frequency at which the in-memory store checks to remove dead initial
+   * applications. Specified in minutes.
+   */
+  public static final String IN_MEMORY_CHECK_PERIOD =
+      IN_MEMORY_STORE_PREFIX + "check-period";
+  public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1db7939..5d53191 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1342,6 +1342,40 @@
     <value>3</value>
   </property>
 
+  <property>
+    <description>The implementation to be used for the SCM store</description>
+    <name>yarn.sharedcache.store.class</name>
+    <value>org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore</value>
+  </property>
+
+  <property>
+    <description>The implementation to be used for the SCM app-checker</description>
+    <name>yarn.sharedcache.app-checker.class</name>
+    <value>org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker</value>
+  </property>
+  
+  <property>
+    <description>A resource in the in-memory store is considered stale
+    if the time since the last reference exceeds the staleness period.
+    This value is specified in minutes.</description>
+    <name>yarn.sharedcache.store.in-memory.staleness-period</name>
+    <value>10080</value>
+  </property>
+  
+  <property>
+    <description>Initial delay before the in-memory store runs its first check
+    to remove dead initial applications. Specified in minutes.</description>
+    <name>yarn.sharedcache.store.in-memory.initial-delay</name>
+    <value>10</value>
+  </property>
+  
+  <property>
+    <description>The frequency at which the in-memory store checks to remove
+    dead initial applications. Specified in minutes.</description>
+    <name>yarn.sharedcache.store.in-memory.check-period</name>
+    <value>720</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java
deleted file mode 100644
index 1bac75b..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.sharedcache;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * A utility class that contains helper methods for dealing with the internal
- * shared cache structure.
- */
-@Private
-@Unstable
-public class SharedCacheStructureUtil {
-
-  private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class);
-
-  @Private
-  public static int getCacheDepth(Configuration conf) {
-    int cacheDepth =
-        conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL,
-            YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL);
-
-    if (cacheDepth <= 0) {
-      LOG.warn("Specified cache depth was less than or equal to zero."
-          + " Using default value instead. Default: "
-          + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL
-          + ", Specified: " + cacheDepth);
-      cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
-    }
-
-    return cacheDepth;
-  }
-
-  @Private
-  public static String getCacheEntryPath(int cacheDepth, String cacheRoot,
-      String checksum) {
-
-    if (cacheDepth <= 0) {
-      throw new IllegalArgumentException(
-          "The cache depth must be greater than 0. Passed value: " + cacheDepth);
-    }
-    if (checksum.length() < cacheDepth) {
-      throw new IllegalArgumentException("The checksum passed was too short: "
-          + checksum);
-    }
-
-    // Build the cache entry path to the specified depth. For example, if the
-    // depth is 3 and the checksum is 3c4f, the path would be:
-    // SHARED_CACHE_ROOT/3/c/4/3c4f
-    StringBuilder sb = new StringBuilder(cacheRoot);
-    for (int i = 0; i < cacheDepth; i++) {
-      sb.append(Path.SEPARATOR_CHAR);
-      sb.append(checksum.charAt(i));
-    }
-    sb.append(Path.SEPARATOR_CHAR).append(checksum);
-
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
new file mode 100644
index 0000000..4b933ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * A utility class that contains helper methods for dealing with the internal
+ * shared cache structure.
+ */
+@Private
+@Unstable
+public class SharedCacheUtil {
+
+  private static final Log LOG = LogFactory.getLog(SharedCacheUtil.class);
+
+  @Private
+  public static int getCacheDepth(Configuration conf) {
+    int cacheDepth =
+        conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL);
+
+    if (cacheDepth <= 0) {
+      LOG.warn("Specified cache depth was less than or equal to zero."
+          + " Using default value instead. Default: "
+          + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL
+          + ", Specified: " + cacheDepth);
+      cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+    }
+
+    return cacheDepth;
+  }
+
+  @Private
+  public static String getCacheEntryPath(int cacheDepth, String cacheRoot,
+      String checksum) {
+
+    if (cacheDepth <= 0) {
+      throw new IllegalArgumentException(
+          "The cache depth must be greater than 0. Passed value: " + cacheDepth);
+    }
+    if (checksum.length() < cacheDepth) {
+      throw new IllegalArgumentException("The checksum passed was too short: "
+          + checksum);
+    }
+
+    // Build the cache entry path to the specified depth. For example, if the
+    // depth is 3 and the checksum is 3c4f, the path would be:
+    // SHARED_CACHE_ROOT/3/c/4/3c4f
+    StringBuilder sb = new StringBuilder(cacheRoot);
+    for (int i = 0; i < cacheDepth; i++) {
+      sb.append(Path.SEPARATOR_CHAR);
+      sb.append(checksum.charAt(i));
+    }
+    sb.append(Path.SEPARATOR_CHAR).append(checksum);
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
index 869298b..213d436 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
@@ -44,6 +44,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-client</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 866c094..2f3ddb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -26,10 +26,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * This service maintains the shared cache meta data. It handles claiming and
@@ -47,12 +52,18 @@ public class SharedCacheManager extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
 
+  private SCMStore store;
+
   public SharedCacheManager() {
     super("SharedCacheManager");
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+
+    this.store = createSCMStoreService(conf);
+    addService(store);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -60,6 +71,25 @@ public class SharedCacheManager extends CompositeService {
     super.serviceInit(conf);
   }
 
+  @SuppressWarnings("unchecked")
+  private static SCMStore createSCMStoreService(Configuration conf) {
+    Class<? extends SCMStore> defaultStoreClass;
+    try {
+      defaultStoreClass =
+          (Class<? extends SCMStore>) Class
+              .forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS);
+    } catch (Exception e) {
+      throw new YarnRuntimeException("Invalid default scm store class"
+          + YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e);
+    }
+
+    SCMStore store =
+        ReflectionUtils.newInstance(conf.getClass(
+            YarnConfiguration.SCM_STORE_CLASS,
+            defaultStoreClass, SCMStore.class), conf);
+    return store;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 
@@ -67,6 +97,14 @@ public class SharedCacheManager extends CompositeService {
     super.serviceStop();
   }
 
+  /**
+   * For testing purposes only.
+   */
+  @VisibleForTesting
+  SCMStore getSCMStore() {
+    return this.store;
+  }
+
   public static void main(String[] args) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
@@ -83,4 +121,24 @@ public class SharedCacheManager extends CompositeService {
       System.exit(-1);
     }
   }
+
+  @Private
+  @SuppressWarnings("unchecked")
+  public static AppChecker createAppCheckerService(Configuration conf) {
+    Class<? extends AppChecker> defaultCheckerClass;
+    try {
+      defaultCheckerClass =
+          (Class<? extends AppChecker>) Class
+              .forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
+    } catch (Exception e) {
+      throw new YarnRuntimeException("Invalid default scm app checker class"
+          + YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
+    }
+
+    AppChecker checker =
+        ReflectionUtils.newInstance(conf.getClass(
+            YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
+            AppChecker.class), conf);
+    return checker;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
new file mode 100644
index 0000000..79369d8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
+import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A thread safe version of an in-memory SCM store. The thread safety is
+ * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap
+ * is used to allow concurrency to resources and their associated references,
+ * and (2) a key level lock is used to ensure mutual exclusion between any
+ * operation that accesses a resource with the same key. <br>
+ * <br>
+ * To ensure safe key-level locking, we use the original string key and intern
+ * it weakly using hadoop's <code>StringInterner</code>. It avoids the pitfalls
+ * of using built-in String interning. The interned strings are also weakly
+ * referenced, so it can be garbage collected once it is done. And there is
+ * little risk of keys being available for other parts of the code so they can
+ * be used as locks accidentally. <br>
+ * <br>
+ * Resources in the in-memory store are evicted based on a time staleness
+ * criteria. If a resource is not referenced (i.e. used) for a given period, it
+ * is designated as a stale resource and is considered evictable.
+ */
+@Private
+@Evolving
+public class InMemorySCMStore extends SCMStore {
+  private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
+
+  private final Map<String, SharedCacheResource> cachedResources =
+      new ConcurrentHashMap<String, SharedCacheResource>();
+  private Collection<ApplicationId> initialApps =
+      new ArrayList<ApplicationId>();
+  private final Object initialAppsLock = new Object();
+  private long startTime;
+  private int stalenessMinutes;
+  private AppChecker appChecker;
+  private ScheduledExecutorService scheduler;
+  private int initialDelayMin;
+  private int checkPeriodMin;
+
+  public InMemorySCMStore() {
+    super(InMemorySCMStore.class.getName());
+  }
+
+  private String intern(String key) {
+    return StringInterner.weakIntern(key);
+  }
+
+  /**
+   * The in-memory store bootstraps itself from the shared cache entries that
+   * exist in HDFS.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    this.startTime = System.currentTimeMillis();
+    this.initialDelayMin = getInitialDelay(conf);
+    this.checkPeriodMin = getCheckPeriod(conf);
+    this.stalenessMinutes = getStalenessPeriod(conf);
+
+    appChecker = createAppCheckerService(conf);
+    addService(appChecker);
+
+    bootstrap(conf);
+
+    ThreadFactory tf =
+        new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
+            .build();
+    scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // start composed services first
+    super.serviceStart();
+
+    // Get initial list of running applications
+    LOG.info("Getting the active app list to initialize the in-memory scm store");
+    synchronized (initialAppsLock) {
+      initialApps = appChecker.getActiveApplications();
+    }
+    LOG.info(initialApps.size() + " apps recorded as active at this time");
+
+    Runnable task = new AppCheckTask(appChecker);
+    scheduler.scheduleAtFixedRate(task, initialDelayMin, checkPeriodMin,
+        TimeUnit.MINUTES);
+    LOG.info("Scheduled the in-memory scm store app check task to run every "
+        + checkPeriodMin + " minutes.");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Shutting down the background thread.");
+    scheduler.shutdownNow();
+    try {
+      if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.warn("Gave up waiting for the app check task to shutdown.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("The InMemorySCMStore was interrupted while shutting down the "
+          + "app check task.", e);
+    }
+    LOG.info("The background thread stopped.");
+
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  AppChecker createAppCheckerService(Configuration conf) {
+    return SharedCacheManager.createAppCheckerService(conf);
+  }
+
+  private void bootstrap(Configuration conf) throws IOException {
+    Map<String, String> initialCachedResources =
+        getInitialCachedResources(FileSystem.get(conf), conf);
+    LOG.info("Bootstrapping from " + initialCachedResources.size()
+        + " cache resources located in the file system");
+    Iterator<Map.Entry<String, String>> it =
+        initialCachedResources.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, String> e = it.next();
+      String key = intern(e.getKey());
+      String fileName = e.getValue();
+      SharedCacheResource resource = new SharedCacheResource(fileName);
+      // we don't hold the lock for this as it is done as part of serviceInit
+      cachedResources.put(key, resource);
+      // clear out the initial resource to reduce the footprint
+      it.remove();
+    }
+    LOG.info("Bootstrapping complete");
+  }
+
+  @VisibleForTesting
+  Map<String, String> getInitialCachedResources(FileSystem fs,
+      Configuration conf) throws IOException {
+    // get the root directory for the shared cache
+    String location =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+    Path root = new Path(location);
+    if (!fs.exists(root)) {
+      String message =
+          "The shared cache root directory " + location + " was not found";
+      LOG.error(message);
+      throw new IOException(message);
+    }
+
+    int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+    // now traverse individual directories and process them
+    // the directory structure is specified by the nested level parameter
+    // (e.g. 9/c/d/<checksum>/file)
+    StringBuilder pattern = new StringBuilder();
+    for (int i = 0; i < nestedLevel + 1; i++) {
+      pattern.append("*/");
+    }
+    pattern.append("*");
+
+    LOG.info("Querying for all individual cached resource files");
+    FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+    int numEntries = entries == null ? 0 : entries.length;
+    LOG.info("Found " + numEntries + " files: processing for one resource per "
+        + "key");
+
+    Map<String, String> initialCachedEntries = new HashMap<String, String>();
+    if (entries != null) {
+      for (FileStatus entry : entries) {
+        Path file = entry.getPath();
+        String fileName = file.getName();
+        if (entry.isFile()) {
+          // get the parent to get the checksum
+          Path parent = file.getParent();
+          if (parent != null) {
+            // the name of the immediate parent directory is the checksum
+            String key = parent.getName();
+            // make sure we insert only one file per checksum whichever comes
+            // first
+            if (initialCachedEntries.containsKey(key)) {
+              LOG.warn("Key " + key + " is already mapped to file "
+                  + initialCachedEntries.get(key) + "; file " + fileName
+                  + " will not be added");
+            } else {
+              initialCachedEntries.put(key, fileName);
+            }
+          }
+        }
+      }
+    }
+    LOG.info("A total of " + initialCachedEntries.size()
+        + " files are now mapped");
+    return initialCachedEntries;
+  }
+
+  /**
+   * Adds the given resource to the store under the key and the filename. If the
+   * entry is already found, it returns the existing filename. It represents the
+   * state of the store at the time of this query. The entry may change or even
+   * be removed once this method returns. The caller should be prepared to
+   * handle that situation.
+   * 
+   * @return the filename of the newly inserted resource or that of the existing
+   *         resource
+   */
+  @Override
+  public String addResource(String key, String fileName) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) {
+        resource = new SharedCacheResource(fileName);
+        cachedResources.put(interned, resource);
+      }
+      return resource.getFileName();
+    }
+  }
+
+  /**
+   * Adds the provided resource reference to the cache resource under the key,
+   * and updates the access time. If it returns a non-null value, the caller may
+   * safely assume that the resource will not be removed at least until the app
+   * in this resource reference has terminated.
+   * 
+   * @return the filename of the resource, or null if the resource is not found
+   */
+  @Override
+  public String addResourceReference(String key,
+      SharedCacheResourceReference ref) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) { // it's not mapped
+        return null;
+      }
+      resource.addReference(ref);
+      resource.updateAccessTime();
+      return resource.getFileName();
+    }
+  }
+
+  /**
+   * Returns the list of resource references currently registered under the
+   * cache entry. If the list is empty, it returns an empty collection. The
+   * returned collection is unmodifiable and a snapshot of the information at
+   * the time of the query. The state may change after this query returns. The
+   * caller should handle the situation that some or all of these resource
+   * references are no longer relevant.
+   * 
+   * @return the collection that contains the resource references associated
+   *         with the resource; or an empty collection if no resource references
+   *         are registered under this resource
+   */
+  @Override
+  public Collection<SharedCacheResourceReference> getResourceReferences(String key) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) {
+        return Collections.emptySet();
+      }
+      Set<SharedCacheResourceReference> refs =
+          new HashSet<SharedCacheResourceReference>(
+              resource.getResourceReferences());
+      return Collections.unmodifiableSet(refs);
+    }
+  }
+
+  /**
+   * Removes the provided resource reference from the resource. If the resource
+   * does not exist, nothing will be done.
+   */
+  @Override
+  public boolean removeResourceReference(String key, SharedCacheResourceReference ref,
+      boolean updateAccessTime) {
+    String interned = intern(key);
+    synchronized (interned) {
+      boolean removed = false;
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource != null) {
+        Set<SharedCacheResourceReference> resourceRefs =
+            resource.getResourceReferences();
+        removed = resourceRefs.remove(ref);
+        if (updateAccessTime) {
+          resource.updateAccessTime();
+        }
+      }
+      return removed;
+    }
+  }
+
+  /**
+   * Removes the provided collection of resource references from the resource.
+   * If the resource does not exist, nothing will be done.
+   */
+  @Override
+  public void removeResourceReferences(String key,
+      Collection<SharedCacheResourceReference> refs, boolean updateAccessTime) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource != null) {
+        Set<SharedCacheResourceReference> resourceRefs =
+            resource.getResourceReferences();
+        resourceRefs.removeAll(refs);
+        if (updateAccessTime) {
+          resource.updateAccessTime();
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the given resource from the store. Returns true if the resource is
+   * found and removed or if the resource is not found. Returns false if it was
+   * unable to remove the resource because the resource reference list was not
+   * empty.
+   */
+  @Override
+  public boolean removeResource(String key) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) {
+        return true;
+      }
+
+      if (!resource.getResourceReferences().isEmpty()) {
+        return false;
+      }
+      // no users
+      cachedResources.remove(interned);
+      return true;
+    }
+  }
+
+  /**
+   * Obtains the access time for a resource. It represents the view of the
+   * resource at the time of the query. The value may have been updated at a
+   * later point.
+   * 
+   * @return the access time of the resource if found; -1 if the resource is not
+   *         found
+   */
+  @VisibleForTesting
+  long getAccessTime(String key) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      return resource == null ? -1 : resource.getAccessTime();
+    }
+  }
+
+  @Override
+  public boolean isResourceEvictable(String key, FileStatus file) {
+    synchronized (initialAppsLock) {
+      if (initialApps.size() > 0) {
+        return false;
+      }
+    }
+
+    long staleTime =
+        System.currentTimeMillis()
+            - TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
+    long accessTime = getAccessTime(key);
+    if (accessTime == -1) {
+      // check modification time
+      long modTime = file.getModificationTime();
+      // if modification time is older then the store startup time, we need to
+      // just use the store startup time as the last point of certainty
+      long lastUse = modTime < this.startTime ? this.startTime : modTime;
+      return lastUse < staleTime;
+    } else {
+      // check access time
+      return accessTime < staleTime;
+    }
+  }
+
+  private static int getStalenessPeriod(Configuration conf) {
+    int stalenessMinutes =
+        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
+            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
+    // non-positive value is invalid; use the default
+    if (stalenessMinutes <= 0) {
+      throw new HadoopIllegalArgumentException("Non-positive staleness value: "
+          + stalenessMinutes
+          + ". The staleness value must be greater than zero.");
+    }
+    return stalenessMinutes;
+  }
+
+  private static int getInitialDelay(Configuration conf) {
+    int initialMinutes =
+        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
+            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
+    // non-positive value is invalid; use the default
+    if (initialMinutes <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Non-positive initial delay value: " + initialMinutes
+              + ". The initial delay value must be greater than zero.");
+    }
+    return initialMinutes;
+  }
+
+  private static int getCheckPeriod(Configuration conf) {
+    int checkMinutes =
+        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
+            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
+    // non-positive value is invalid; use the default
+    if (checkMinutes <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Non-positive check period value: " + checkMinutes
+              + ". The check period value must be greater than zero.");
+    }
+    return checkMinutes;
+  }
+
+  @Private
+  @Evolving
+  class AppCheckTask implements Runnable {
+
+    private final AppChecker taskAppChecker;
+
+    public AppCheckTask(AppChecker appChecker) {
+      this.taskAppChecker = appChecker;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.info("Checking the initial app list for finished applications.");
+        synchronized (initialAppsLock) {
+          if (initialApps.isEmpty()) {
+            // we're fine, no-op; there are no active apps that were running at
+            // the time of the service start
+          } else {
+            LOG.info("Looking into " + initialApps.size()
+                + " apps to see if they are still active");
+            Iterator<ApplicationId> it = initialApps.iterator();
+            while (it.hasNext()) {
+              ApplicationId id = it.next();
+              try {
+                if (!taskAppChecker.isApplicationActive(id)) {
+                  // remove it from the list
+                  it.remove();
+                }
+              } catch (YarnException e) {
+                LOG.warn("Exception while checking the app status;"
+                    + " will leave the entry in the list", e);
+                // continue
+              }
+            }
+          }
+          LOG.info("There are now " + initialApps.size()
+              + " entries in the list");
+        }
+      } catch (Throwable e) {
+        LOG.error(
+            "Unexpected exception thrown during in-memory store app check task."
+                + " Rescheduling task.", e);
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
new file mode 100644
index 0000000..397d904
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.service.CompositeService;
+
+
+/**
+ * An abstract class for the data store used by the shared cache manager
+ * service. All implementations of methods in this interface need to be thread
+ * safe and atomic.
+ */
+@Private
+@Evolving
+public abstract class SCMStore extends CompositeService {
+
+  protected SCMStore(String name) {
+    super(name);
+  }
+
+  /**
+   * Add a resource to the shared cache and it's associated filename. The
+   * resource is identified by a unique key. If the key already exists no action
+   * is taken and the filename of the existing resource is returned. If the key
+   * does not exist, the resource is added, it's access time is set, and the
+   * filename of the resource is returned.
+   * 
+   * @param key a unique identifier for a resource
+   * @param fileName the filename of the resource
+   * @return the filename of the resource as represented by the cache
+   */
+  @Private
+  public abstract String addResource(String key, String fileName);
+
+  /**
+   * Remove a resource from the shared cache.
+   * 
+   * @param key a unique identifier for a resource
+   * @return true if the resource was removed or did not exist, false if the
+   *         resource existed, contained at least one
+   *         <code>SharedCacheResourceReference</code> and was not removed.
+   */
+  @Private
+  public abstract boolean removeResource(String key);
+
+  /**
+   * Add a <code>SharedCacheResourceReference</code> to a resource and update
+   * the resource access time.
+   * 
+   * @param key a unique identifier for a resource
+   * @param ref the <code>SharedCacheResourceReference</code> to add
+   * @return String the filename of the resource if the
+   *         <code>SharedCacheResourceReference</code> was added or already
+   *         existed. null if the resource did not exist
+   */
+  @Private
+  public abstract String addResourceReference(String key,
+      SharedCacheResourceReference ref);
+
+  /**
+   * Get the <code>SharedCacheResourceReference</code>(s) associated with the
+   * resource.
+   * 
+   * @param key a unique identifier for a resource
+   * @return an unmodifiable collection of
+   *         <code>SharedCacheResourceReferences</code>. If the resource does
+   *         not exist, an empty set is returned.
+   */
+  @Private
+  public abstract Collection<SharedCacheResourceReference> getResourceReferences(
+      String key);
+
+  /**
+   * Remove a <code>SharedCacheResourceReference</code> from a resource.
+   * 
+   * @param key a unique identifier for a resource
+   * @param ref the <code>SharedCacheResourceReference</code> to remove
+   * @param updateAccessTime true if the call should update the access time for
+   *          the resource
+   * @return true if the reference was removed, false otherwise
+   */
+  @Private
+  public abstract boolean removeResourceReference(String key,
+      SharedCacheResourceReference ref, boolean updateAccessTime);
+
+  /**
+   * Remove a collection of <code>SharedCacheResourceReferences</code> from a
+   * resource.
+   * 
+   * @param key a unique identifier for a resource
+   * @param refs the collection of <code>SharedCacheResourceReference</code>s to
+   *          remove
+   * @param updateAccessTime true if the call should update the access time for
+   *          the resource
+   */
+  @Private
+  public abstract void removeResourceReferences(String key,
+      Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
+
+  /**
+   * Check if a specific resource is evictable according to the store's enabled
+   * cache eviction policies.
+   * 
+   * @param key a unique identifier for a resource
+   * @param file the <code>FileStatus</code> object for the resource file in the
+   *          file system.
+   * @return true if the resource is evicatble, false otherwise
+   */
+  @Private
+  public abstract boolean isResourceEvictable(String key, FileStatus file);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java
new file mode 100644
index 0000000..cb0df54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Class that encapsulates the cache resource. The instances are not thread
+ * safe. Any operation that uses the resource must use thread-safe mechanisms to
+ * ensure safe access with the only exception of the filename.
+ */
+@Private
+@Evolving
+class SharedCacheResource {
+  private long accessTime;
+  private final Set<SharedCacheResourceReference> refs;
+  private final String fileName;
+
+  SharedCacheResource(String fileName) {
+    this.accessTime = System.currentTimeMillis();
+    this.refs = new HashSet<SharedCacheResourceReference>();
+    this.fileName = fileName;
+  }
+
+  long getAccessTime() {
+    return accessTime;
+  }
+
+  void updateAccessTime() {
+    accessTime = System.currentTimeMillis();
+  }
+
+  String getFileName() {
+    return this.fileName;
+  }
+
+  Set<SharedCacheResourceReference> getResourceReferences() {
+    return this.refs;
+  }
+
+  boolean addReference(SharedCacheResourceReference ref) {
+    return this.refs.add(ref);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java
new file mode 100644
index 0000000..d595d97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * This is an object that represents a reference to a shared cache resource.
+ */
+@Private
+@Evolving
+public class SharedCacheResourceReference {
+  private final ApplicationId appId;
+  private final String shortUserName;
+
+  /**
+   * Create a resource reference.
+   * 
+   * @param appId <code>ApplicationId</code> that is referencing a resource.
+   * @param shortUserName <code>ShortUserName</code> of the user that created
+   *          the reference.
+   */
+  public SharedCacheResourceReference(ApplicationId appId, String shortUserName) {
+    this.appId = appId;
+    this.shortUserName = shortUserName;
+  }
+
+  public ApplicationId getAppId() {
+    return this.appId;
+  }
+
+  public String getShortUserName() {
+    return this.shortUserName;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((appId == null) ? 0 : appId.hashCode());
+    result =
+        prime * result
+            + ((shortUserName == null) ? 0 : shortUserName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SharedCacheResourceReference other = (SharedCacheResourceReference) obj;
+    if (appId == null) {
+      if (other.appId != null)
+        return false;
+    } else if (!appId.equals(other.appId))
+      return false;
+    if (shortUserName == null) {
+      if (other.shortUserName != null)
+        return false;
+    } else if (!shortUserName.equals(other.shortUserName))
+      return false;
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f426fe2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
new file mode 100644
index 0000000..891703e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemorySCMStore {
+
+  private InMemorySCMStore store;
+  private AppChecker checker;
+
+  @Before
+  public void setup() {
+    this.store = spy(new InMemorySCMStore());
+    this.checker = spy(new DummyAppChecker());
+    doReturn(checker).when(store).createAppCheckerService(
+        isA(Configuration.class));
+  }
+
+  @After
+  public void cleanup() {
+    if (this.store != null) {
+      this.store.stop();
+    }
+  }
+
+  private void startEmptyStore() throws Exception {
+    doReturn(new ArrayList<ApplicationId>()).when(checker)
+        .getActiveApplications();
+    doReturn(new HashMap<String, String>()).when(store)
+        .getInitialCachedResources(isA(FileSystem.class),
+            isA(Configuration.class));
+    this.store.init(new Configuration());
+    this.store.start();
+  }
+
+  private Map<String, String> startStoreWithResources() throws Exception {
+    Map<String, String> initialCachedResources = new HashMap<String, String>();
+    int count = 10;
+    for (int i = 0; i < count; i++) {
+      String key = String.valueOf(i);
+      String fileName = key + ".jar";
+      initialCachedResources.put(key, fileName);
+    }
+    doReturn(new ArrayList<ApplicationId>()).when(checker)
+        .getActiveApplications();
+    doReturn(initialCachedResources).when(store).getInitialCachedResources(
+        isA(FileSystem.class), isA(Configuration.class));
+    this.store.init(new Configuration());
+    this.store.start();
+    return initialCachedResources;
+  }
+
+  private void startStoreWithApps() throws Exception {
+    ArrayList<ApplicationId> list = new ArrayList<ApplicationId>();
+    int count = 5;
+    for (int i = 0; i < count; i++) {
+      list.add(createAppId(i, i));
+    }
+    doReturn(list).when(checker).getActiveApplications();
+    doReturn(new HashMap<String, String>()).when(store)
+        .getInitialCachedResources(isA(FileSystem.class),
+            isA(Configuration.class));
+    this.store.init(new Configuration());
+    this.store.start();
+  }
+
+  @Test
+  public void testAddResourceConcurrency() throws Exception {
+    startEmptyStore();
+    final String key = "key1";
+    int count = 5;
+    ExecutorService exec = Executors.newFixedThreadPool(count);
+    List<Future<String>> futures = new ArrayList<Future<String>>(count);
+    final CountDownLatch start = new CountDownLatch(1);
+    for (int i = 0; i < count; i++) {
+      final String fileName = "foo-" + i + ".jar";
+      Callable<String> task = new Callable<String>() {
+        public String call() throws Exception {
+          start.await();
+          String result = store.addResource(key, fileName);
+          System.out.println("fileName: " + fileName + ", result: " + result);
+          return result;
+        }
+      };
+      futures.add(exec.submit(task));
+    }
+    // start them all at the same time
+    start.countDown();
+    // check the result; they should all agree with the value
+    Set<String> results = new HashSet<String>();
+    for (Future<String> future: futures) {
+      results.add(future.get());
+    }
+    assertSame(1, results.size());
+    exec.shutdown();
+  }
+
+  @Test
+  public void testAddResourceRefNonExistentResource() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    ApplicationId id = createAppId(1, 1L);
+    // try adding an app id without adding the key first
+    assertNull(store.addResourceReference(key,
+        new SharedCacheResourceReference(id, "user")));
+  }
+
+  @Test
+  public void testRemoveResourceEmptyRefs() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    String fileName = "foo.jar";
+    // first add resource
+    store.addResource(key, fileName);
+    // try removing the resource; it should return true
+    assertTrue(store.removeResource(key));
+  }
+
+  @Test
+  public void testAddResourceRefRemoveResource() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    ApplicationId id = createAppId(1, 1L);
+    String user = "user";
+    // add the resource, and then add a resource ref
+    store.addResource(key, "foo.jar");
+    store.addResourceReference(key, new SharedCacheResourceReference(id, user));
+    // removeResource should return false
+    assertTrue(!store.removeResource(key));
+    // the resource and the ref should be intact
+    Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
+    assertTrue(refs != null);
+    assertEquals(Collections.singleton(new SharedCacheResourceReference(id, user)), refs);
+  }
+
+  @Test
+  public void testAddResourceRefConcurrency() throws Exception {
+    startEmptyStore();
+    final String key = "key1";
+    final String user = "user";
+    String fileName = "foo.jar";
+
+    // first add the resource
+    store.addResource(key, fileName);
+
+    // make concurrent addResourceRef calls (clients)
+    int count = 5;
+    ExecutorService exec = Executors.newFixedThreadPool(count);
+    List<Future<String>> futures = new ArrayList<Future<String>>(count);
+    final CountDownLatch start = new CountDownLatch(1);
+    for (int i = 0; i < count; i++) {
+      final ApplicationId id = createAppId(i, i);
+      Callable<String> task = new Callable<String>() {
+        public String call() throws Exception {
+          start.await();
+          return store.addResourceReference(key,
+              new SharedCacheResourceReference(id, user));
+        }
+      };
+      futures.add(exec.submit(task));
+    }
+    // start them all at the same time
+    start.countDown();
+    // check the result
+    Set<String> results = new HashSet<String>();
+    for (Future<String> future: futures) {
+      results.add(future.get());
+    }
+    // they should all have the same file name
+    assertSame(1, results.size());
+    assertEquals(Collections.singleton(fileName), results);
+    // there should be 5 refs as a result
+    Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
+    assertSame(count, refs.size());
+    exec.shutdown();
+  }
+
+  @Test
+  public void testAddResourceRefAddResourceConcurrency() throws Exception {
+    startEmptyStore();
+    final String key = "key1";
+    final String fileName = "foo.jar";
+    final String user = "user";
+    final ApplicationId id = createAppId(1, 1L);
+    // add the resource and add the resource ref at the same time
+    ExecutorService exec = Executors.newFixedThreadPool(2);
+    final CountDownLatch start = new CountDownLatch(1);
+    Callable<String> addKeyTask = new Callable<String>() {
+      public String call() throws Exception {
+        start.await();
+        return store.addResource(key, fileName);
+      }
+    };
+    Callable<String> addAppIdTask = new Callable<String>() {
+      public String call() throws Exception {
+        start.await();
+        return store.addResourceReference(key,
+            new SharedCacheResourceReference(id, user));
+      }
+    };
+    Future<String> addAppIdFuture = exec.submit(addAppIdTask);
+    Future<String> addKeyFuture = exec.submit(addKeyTask);
+    // start them at the same time
+    start.countDown();
+    // get the results
+    String addKeyResult = addKeyFuture.get();
+    String addAppIdResult = addAppIdFuture.get();
+    assertEquals(fileName, addKeyResult);
+    System.out.println("addAppId() result: " + addAppIdResult);
+    // it may be null or the fileName depending on the timing
+    assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName));
+    exec.shutdown();
+  }
+
+  @Test
+  public void testRemoveRef() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    String fileName = "foo.jar";
+    String user = "user";
+    // first add the resource
+    store.addResource(key, fileName);
+    // add a ref
+    ApplicationId id = createAppId(1, 1L);
+    SharedCacheResourceReference myRef = new SharedCacheResourceReference(id, user);
+    String result = store.addResourceReference(key, myRef);
+    assertEquals(fileName, result);
+    Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
+    assertSame(1, refs.size());
+    assertEquals(Collections.singleton(myRef), refs);
+    // remove the same ref
+    store.removeResourceReferences(key, Collections.singleton(myRef), true);
+    Collection<SharedCacheResourceReference> newRefs = store.getResourceReferences(key);
+    assertTrue(newRefs == null || newRefs.isEmpty());
+  }
+
+  @Test
+  public void testBootstrapping() throws Exception {
+    Map<String, String> initialCachedResources = startStoreWithResources();
+    int count = initialCachedResources.size();
+    ApplicationId id = createAppId(1, 1L);
+    // the entries from the cached entries should now exist
+    for (int i = 0; i < count; i++) {
+      String key = String.valueOf(i);
+      String fileName = key + ".jar";
+      String result =
+          store.addResourceReference(key, new SharedCacheResourceReference(id,
+              "user"));
+      // the value should not be null (i.e. it has the key) and the filename should match
+      assertEquals(fileName, result);
+      // the initial input should be emptied
+      assertTrue(initialCachedResources.isEmpty());
+    }
+  }
+
+  @Test
+  public void testEvictableWithInitialApps() throws Exception {
+    startStoreWithApps();
+    assertFalse(store.isResourceEvictable("key", mock(FileStatus.class)));
+  }
+
+  private ApplicationId createAppId(int id, long timestamp) {
+    return ApplicationId.newInstance(timestamp, id);
+  }
+
+  class DummyAppChecker extends AppChecker {
+
+    @Override
+    @Private
+    public boolean isApplicationActive(ApplicationId id) throws YarnException {
+      // stub
+      return false;
+    }
+
+    @Override
+    @Private
+    public Collection<ApplicationId> getActiveApplications()
+        throws YarnException {
+      // stub
+      return null;
+    }
+
+  }
+}