You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/11/12 18:31:17 UTC

hadoop git commit: YARN-2236. [YARN-1492] Shared Cache uploader service on the Node Manager. (Chris Trezzo and Sanjin Lee via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk f8aefa5e9 -> a04143039


YARN-2236. [YARN-1492] Shared Cache uploader service on the Node Manager. (Chris Trezzo and Sanjin Lee via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0414303
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0414303
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0414303

Branch: refs/heads/trunk
Commit: a04143039e7fe310d807f40584633096181cfada
Parents: f8aefa5
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Nov 12 09:31:05 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Nov 12 09:31:05 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/api/records/LocalResource.java  |  38 +++
 .../hadoop/yarn/conf/YarnConfiguration.java     |  19 ++
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../records/impl/pb/LocalResourcePBImpl.java    |  20 ++
 .../yarn/sharedcache/ChecksumSHA256Impl.java    |  37 +++
 .../yarn/sharedcache/SharedCacheChecksum.java   |  43 +++
 .../sharedcache/SharedCacheChecksumFactory.java |  84 ++++++
 .../org/apache/hadoop/yarn/util/FSDownload.java |   5 +-
 .../src/main/resources/yarn-default.xml         |  18 ++
 .../hadoop/yarn/server/utils/BuilderUtils.java  |  10 +-
 .../containermanager/ContainerManagerImpl.java  |  13 +
 .../container/ContainerImpl.java                |  72 ++++-
 .../localizer/LocalResourceRequest.java         |  11 +
 .../sharedcache/SharedCacheUploadEvent.java     |  58 ++++
 .../sharedcache/SharedCacheUploadEventType.java |  28 ++
 .../sharedcache/SharedCacheUploadService.java   | 126 ++++++++
 .../sharedcache/SharedCacheUploader.java        | 289 +++++++++++++++++++
 .../container/TestContainer.java                |   2 +-
 .../TestResourceLocalizationService.java        |   2 +-
 .../TestSharedCacheUploadService.java           |  50 ++++
 .../sharedcache/TestSharedCacheUploader.java    | 241 ++++++++++++++++
 22 files changed, 1158 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 05bb8f6..ec3a331 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -45,6 +45,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. 
     (Chris Trezzo and Sangjin Lee via kasha)
 
+    YARN-2236. [YARN-1492] Shared Cache uploader service on the Node 
+    Manager. (Chris Trezzo and Sanjin Lee via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
index f14a136..726d969 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -48,6 +49,14 @@ public abstract class LocalResource {
   public static LocalResource newInstance(URL url, LocalResourceType type,
       LocalResourceVisibility visibility, long size, long timestamp,
       String pattern) {
+    return newInstance(url, type, visibility, size, timestamp, pattern, false);
+  }
+
+  @Public
+  @Unstable
+  public static LocalResource newInstance(URL url, LocalResourceType type,
+      LocalResourceVisibility visibility, long size, long timestamp,
+      String pattern, boolean shouldBeUploadedToSharedCache) {
     LocalResource resource = Records.newRecord(LocalResource.class);
     resource.setResource(url);
     resource.setType(type);
@@ -55,6 +64,7 @@ public abstract class LocalResource {
     resource.setSize(size);
     resource.setTimestamp(timestamp);
     resource.setPattern(pattern);
+    resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
     return resource;
   }
 
@@ -65,6 +75,15 @@ public abstract class LocalResource {
     return newInstance(url, type, visibility, size, timestamp, null);
   }
 
+  @Public
+  @Unstable
+  public static LocalResource newInstance(URL url, LocalResourceType type,
+      LocalResourceVisibility visibility, long size, long timestamp,
+      boolean shouldBeUploadedToSharedCache) {
+    return newInstance(url, type, visibility, size, timestamp, null,
+        shouldBeUploadedToSharedCache);
+  }
+
   /**
    * Get the <em>location</em> of the resource to be localized.
    * @return <em>location</em> of the resource to be localized
@@ -170,4 +189,23 @@ public abstract class LocalResource {
   @Public
   @Stable
   public abstract void setPattern(String pattern);
+
+  /**
+   * NM uses it to decide whether if it is necessary to upload the resource to
+   * the shared cache
+   */
+  @Public
+  @Unstable
+  public abstract boolean getShouldBeUploadedToSharedCache();
+
+  /**
+   * Inform NM whether upload to SCM is needed.
+   *
+   * @param shouldBeUploadedToSharedCache <em>shouldBeUploadedToSharedCache</em>
+   *          of this request
+   */
+  @Public
+  @Unstable
+  public abstract void setShouldBeUploadedToSharedCache(
+      boolean shouldBeUploadedToSharedCache);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9b57a42..4b4f581 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1472,6 +1472,25 @@ public class YarnConfiguration extends Configuration {
       SHARED_CACHE_PREFIX + "uploader.server.thread-count";
   public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
 
+  /** the checksum algorithm implementation **/
+  public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+      SHARED_CACHE_PREFIX + "checksum.algo.impl";
+  public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+      "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl";
+
+  // node manager (uploader) configs
+  /**
+   * The replication factor for the node manager uploader for the shared cache.
+   */
+  public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+      SHARED_CACHE_PREFIX + "nm.uploader.replication.factor";
+  public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+      10;
+
+  public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT =
+      SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
+  public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 5c86c2d..c4e756d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -159,6 +159,7 @@ message LocalResourceProto {
   optional LocalResourceTypeProto type = 4;
   optional LocalResourceVisibilityProto visibility = 5;
   optional string pattern = 6;
+  optional bool should_be_uploaded_to_shared_cache = 7;
 }
 
 message ApplicationResourceUsageReportProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
index 16bd597..560b081 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
@@ -192,6 +192,26 @@ public class LocalResourcePBImpl extends LocalResource {
     builder.setPattern(pattern);
   }
 
+  @Override
+  public synchronized boolean getShouldBeUploadedToSharedCache() {
+    LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasShouldBeUploadedToSharedCache()) {
+      return false;
+    }
+    return p.getShouldBeUploadedToSharedCache();
+  }
+
+  @Override
+  public synchronized void setShouldBeUploadedToSharedCache(
+      boolean shouldBeUploadedToSharedCache) {
+    maybeInitBuilder();
+    if (!shouldBeUploadedToSharedCache) {
+      builder.clearShouldBeUploadedToSharedCache();
+      return;
+    }
+    builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
+  }
+
   private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
     return ProtoUtils.convertToProtoFormat(e);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
new file mode 100644
index 0000000..24ceeae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+/**
+ * The SHA-256 implementation of the shared cache checksum interface.
+ */
+public class ChecksumSHA256Impl implements SharedCacheChecksum {
+  public String computeChecksum(InputStream in) throws IOException {
+    return DigestUtils.sha256Hex(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java
new file mode 100644
index 0000000..7e6fdda
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+/**
+ * An interface to calculate a checksum for a resource in the shared cache. The
+ * checksum implementation should be thread safe.
+ */
+public interface SharedCacheChecksum {
+
+  /**
+   * Calculate the checksum of the passed input stream.
+   *
+   * @param in <code>InputStream</code> to be checksumed
+   * @return the message digest of the input stream
+   * @throws IOException
+   */
+  public String computeChecksum(InputStream in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java
new file mode 100644
index 0000000..cbfd95d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+@SuppressWarnings("unchecked")
+@Public
+@Evolving
+/**
+ * A factory class for creating checksum objects based on a configurable
+ * algorithm implementation
+ */
+public class SharedCacheChecksumFactory {
+  private static final
+      ConcurrentMap<Class<? extends SharedCacheChecksum>,SharedCacheChecksum>
+      instances =
+          new ConcurrentHashMap<Class<? extends SharedCacheChecksum>,
+          SharedCacheChecksum>();
+
+  private static final Class<? extends SharedCacheChecksum> defaultAlgorithm;
+
+  static {
+    try {
+      defaultAlgorithm = (Class<? extends SharedCacheChecksum>)
+          Class.forName(
+              YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+    } catch (Exception e) {
+      // cannot happen
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  /**
+   * Get a new <code>SharedCacheChecksum</code> object based on the configurable
+   * algorithm implementation
+   * (see <code>yarn.sharedcache.checksum.algo.impl</code>)
+   *
+   * @return <code>SharedCacheChecksum</code> object
+   */
+  public static SharedCacheChecksum getChecksum(Configuration conf) {
+    Class<? extends SharedCacheChecksum> clazz =
+        conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
+        defaultAlgorithm, SharedCacheChecksum.class);
+    SharedCacheChecksum checksum = instances.get(clazz);
+    if (checksum == null) {
+      try {
+        checksum = ReflectionUtils.newInstance(clazz, conf);
+        SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
+        if (old != null) {
+          checksum = old;
+        }
+      } catch (Exception e) {
+        throw new YarnRuntimeException(e);
+      }
+    }
+
+    return checksum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 2737cce..436cb31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -134,8 +135,8 @@ public class FSDownload implements Callable<Path> {
    * @return true if the path in the current path is visible to all, false
    * otherwise
    */
-  @VisibleForTesting
-  static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+  @Private
+  public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
       LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
     current = fs.makeQualified(current);
     //the leaf level file should be readable by others

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index af4a5eb..af3b5aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1458,6 +1458,24 @@
     <value>50</value>
   </property>
 
+  <property>
+    <description>The algorithm used to compute checksums of files (SHA-256 by default)</description>
+    <name>yarn.sharedcache.checksum.algo.impl</name>
+    <value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value>
+  </property>
+
+  <property>
+    <description>The replication factor for the node manager uploader for the shared cache (10 by default)</description>
+    <name>yarn.sharedcache.nm.uploader.replication.factor</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>The number of threads used to upload files from a node manager instance (20 by default)</description>
+    <name>yarn.sharedcache.nm.uploader.thread-count</name>
+    <value>20</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index a7e5d9c..1b32671 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -70,6 +70,7 @@ import com.google.common.annotations.VisibleForTesting;
  * Builder utilities to construct various objects.
  *
  */
+@Private
 public class BuilderUtils {
 
   private static final RecordFactory recordFactory = RecordFactoryProvider
@@ -94,7 +95,8 @@ public class BuilderUtils {
   }
 
   public static LocalResource newLocalResource(URL url, LocalResourceType type,
-      LocalResourceVisibility visibility, long size, long timestamp) {
+      LocalResourceVisibility visibility, long size, long timestamp,
+      boolean shouldBeUploadedToSharedCache) {
     LocalResource resource =
       recordFactory.newRecordInstance(LocalResource.class);
     resource.setResource(url);
@@ -102,14 +104,15 @@ public class BuilderUtils {
     resource.setVisibility(visibility);
     resource.setSize(size);
     resource.setTimestamp(timestamp);
+    resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
     return resource;
   }
 
   public static LocalResource newLocalResource(URI uri,
       LocalResourceType type, LocalResourceVisibility visibility, long size,
-      long timestamp) {
+      long timestamp, boolean shouldBeUploadedToSharedCache) {
     return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
-        visibility, size, timestamp);
+        visibility, size, timestamp, shouldBeUploadedToSharedCache);
   }
 
   public static ApplicationId newApplicationId(RecordFactory recordFactory,
@@ -245,7 +248,6 @@ public class BuilderUtils {
     return newToken(Token.class, identifier, kind, password, service);
   }
 
-  @Private
   @VisibleForTesting
   public static Token newContainerToken(NodeId nodeId,
       byte[] password, ContainerTokenIdentifier tokenIdentifier) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 35b232f..bb277d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
@@ -227,6 +229,13 @@ public class ContainerManagerImpl extends CompositeService implements
     addIfService(logHandler);
     dispatcher.register(LogHandlerEventType.class, logHandler);
     
+    // add the shared cache upload service (it will do nothing if the shared
+    // cache is disabled)
+    SharedCacheUploadService sharedCacheUploader =
+        createSharedCacheUploaderService();
+    addService(sharedCacheUploader);
+    dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
+
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
             YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -367,6 +376,10 @@ public class ContainerManagerImpl extends CompositeService implements
         deletionContext, dirsHandler, context);
   }
 
+  protected SharedCacheUploadService createSharedCacheUploaderService() {
+    return new SharedCacheUploadService();
+  }
+
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index fa54ee1..6b65a54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -27,6 +27,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -59,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -104,6 +107,10 @@ public class ContainerImpl implements Container {
     new ArrayList<LocalResourceRequest>();
   private final List<LocalResourceRequest> appRsrcs =
     new ArrayList<LocalResourceRequest>();
+  private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
+      new ConcurrentHashMap<LocalResourceRequest, Path>();
+  private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
+      new ConcurrentHashMap<LocalResourceRequest, Boolean>();
 
   // whether container has been recovered after a restart
   private RecoveredContainerStatus recoveredStatus =
@@ -637,6 +644,8 @@ public class ContainerImpl implements Container {
                 container.pendingResources.put(req, links);
               }
               links.add(rsrc.getKey());
+              storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
+                  .getShouldBeUploadedToSharedCache());
               switch (rsrc.getValue().getVisibility()) {
               case PUBLIC:
                 container.publicRsrcs.add(req);
@@ -686,30 +695,76 @@ public class ContainerImpl implements Container {
   }
 
   /**
+   * Store the resource's shared cache upload policies
+   * Given LocalResourceRequest can be shared across containers in
+   * LocalResourcesTrackerImpl, we preserve the upload policies here.
+   * In addition, it is possible for the application to create several
+   * "identical" LocalResources as part of
+   * ContainerLaunchContext.setLocalResources with different symlinks.
+   * There is a corner case where these "identical" local resources have
+   * different upload policies. For that scenario, upload policy will be set to
+   * true as long as there is at least one LocalResource entry with
+   * upload policy set to true.
+   */
+  private static void storeSharedCacheUploadPolicy(ContainerImpl container,
+      LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
+    Boolean storedUploadPolicy =
+        container.resourcesUploadPolicies.get(resourceRequest);
+    if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
+      container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
+    }
+  }
+
+  /**
    * Transition when one of the requested resources for this container
    * has been successfully localized.
    */
   static class LocalizedTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
+    @SuppressWarnings("unchecked")
     @Override
     public ContainerState transition(ContainerImpl container,
         ContainerEvent event) {
       ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
-      List<String> syms =
-          container.pendingResources.remove(rsrcEvent.getResource());
+      LocalResourceRequest resourceRequest = rsrcEvent.getResource();
+      Path location = rsrcEvent.getLocation();
+      List<String> syms = container.pendingResources.remove(resourceRequest);
       if (null == syms) {
-        LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+        LOG.warn("Localized unknown resource " + resourceRequest +
                  " for container " + container.containerId);
         assert false;
         // fail container?
         return ContainerState.LOCALIZING;
       }
-      container.localizedResources.put(rsrcEvent.getLocation(), syms);
+      container.localizedResources.put(location, syms);
+
+      // check to see if this resource should be uploaded to the shared cache
+      // as well
+      if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
+        container.resourcesToBeUploaded.put(resourceRequest, location);
+      }
       if (!container.pendingResources.isEmpty()) {
         return ContainerState.LOCALIZING;
       }
 
       container.sendLaunchEvent();
+
+      // If this is a recovered container that has already launched, skip
+      // uploading resources to the shared cache. We do this to avoid uploading
+      // the same resources multiple times. The tradeoff is that in the case of
+      // a recovered container, there is a chance that resources don't get
+      // uploaded into the shared cache. This is OK because resources are not
+      // acknowledged by the SCM until they have been uploaded by the node
+      // manager.
+      if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
+          && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
+        // kick off uploads to the shared cache
+        container.dispatcher.getEventHandler().handle(
+            new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
+                .getLaunchContext(), container.getUser(),
+                SharedCacheUploadEventType.UPLOAD));
+      }
+
       container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
@@ -1018,4 +1073,13 @@ public class ContainerImpl implements Container {
   private boolean hasDefaultExitCode() {
     return (this.exitCode == ContainerExitStatus.INVALID);
   }
+
+  /**
+   * Returns whether the specific resource should be uploaded to the shared
+   * cache.
+   */
+  private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
+      LocalResourceRequest resource) {
+    return container.resourcesUploadPolicies.get(resource);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
index 70bead7..607d0b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
@@ -152,6 +152,17 @@ public class LocalResourceRequest
   }
   
   @Override
+  public boolean getShouldBeUploadedToSharedCache() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setShouldBeUploadedToSharedCache(
+      boolean shouldBeUploadedToSharedCache) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void setResource(URL resource) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
new file mode 100644
index 0000000..2be080e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+@Private
+@Unstable
+public class SharedCacheUploadEvent extends
+    AbstractEvent<SharedCacheUploadEventType> {
+  private final Map<LocalResourceRequest,Path> resources;
+  private final ContainerLaunchContext context;
+  private final String user;
+
+  public SharedCacheUploadEvent(Map<LocalResourceRequest,Path> resources,
+      ContainerLaunchContext context, String user,
+      SharedCacheUploadEventType eventType) {
+    super(eventType);
+    this.resources = resources;
+    this.context = context;
+    this.user = user;
+  }
+
+  public Map<LocalResourceRequest,Path> getResources() {
+    return resources;
+  }
+
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return context;
+  }
+
+  public String getUser() {
+    return user;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
new file mode 100644
index 0000000..5ba7e1b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+@Private
+@Unstable
+public enum SharedCacheUploadEventType {
+  UPLOAD
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
new file mode 100644
index 0000000..cb11f99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
@@ -0,0 +1,126 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Private
+@Unstable
+/**
+ * Service that uploads localized files to the shared cache. The upload is
+ * considered not critical, and is done on a best-effort basis. Failure to
+ * upload is not fatal.
+ */
+public class SharedCacheUploadService extends AbstractService implements
+    EventHandler<SharedCacheUploadEvent> {
+  private static final Log LOG =
+      LogFactory.getLog(SharedCacheUploadService.class);
+
+  private boolean enabled;
+  private FileSystem fs;
+  private FileSystem localFs;
+  private ExecutorService uploaderPool;
+  private SCMUploaderProtocol scmClient;
+
+  public SharedCacheUploadService() {
+    super(SharedCacheUploadService.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
+        YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED);
+    if (enabled) {
+      int threadCount =
+          conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
+      uploaderPool = Executors.newFixedThreadPool(threadCount,
+          new ThreadFactoryBuilder().
+            setNameFormat("Shared cache uploader #%d").
+            build());
+      scmClient = createSCMClient(conf);
+      try {
+        fs = FileSystem.get(conf);
+        localFs = FileSystem.getLocal(conf);
+      } catch (IOException e) {
+        LOG.error("Unexpected exception in getting the filesystem", e);
+        throw new RuntimeException(e);
+      }
+    }
+    super.serviceInit(conf);
+  }
+
+  private SCMUploaderProtocol createSCMClient(Configuration conf) {
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress scmAddress =
+        conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+    return (SCMUploaderProtocol)rpc.getProxy(
+        SCMUploaderProtocol.class, scmAddress, conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (enabled) {
+      uploaderPool.shutdown();
+      RPC.stopProxy(scmClient);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(SharedCacheUploadEvent event) {
+    if (enabled) {
+      Map<LocalResourceRequest,Path> resources = event.getResources();
+      for (Map.Entry<LocalResourceRequest,Path> e: resources.entrySet()) {
+        SharedCacheUploader uploader =
+            new SharedCacheUploader(e.getKey(), e.getValue(), event.getUser(),
+                getConfig(), scmClient, fs, localFs);
+        // fire off an upload task
+        uploaderPool.submit(uploader);
+      }
+    }
+  }
+
+  public boolean isEnabled() {
+    return enabled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java
new file mode 100644
index 0000000..050d531
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java
@@ -0,0 +1,289 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URISyntaxException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The callable class that handles the actual upload to the shared cache.
+ */
+class SharedCacheUploader implements Callable<Boolean> {
+  // rwxr-xr-x
+  static final FsPermission DIRECTORY_PERMISSION =
+      new FsPermission((short)00755);
+  // r-xr-xr-x
+  static final FsPermission FILE_PERMISSION =
+      new FsPermission((short)00555);
+
+  private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
+  private static final ThreadLocal<Random> randomTl =
+      new ThreadLocal<Random>() {
+        @Override
+        protected Random initialValue() {
+          return new Random(System.nanoTime());
+        }
+      };
+
+  private final LocalResource resource;
+  private final Path localPath;
+  private final String user;
+  private final Configuration conf;
+  private final SCMUploaderProtocol scmClient;
+  private final FileSystem fs;
+  private final FileSystem localFs;
+  private final String sharedCacheRootDir;
+  private final int nestedLevel;
+  private final SharedCacheChecksum checksum;
+  private final RecordFactory recordFactory;
+
+  public SharedCacheUploader(LocalResource resource, Path localPath,
+      String user, Configuration conf, SCMUploaderProtocol scmClient)
+          throws IOException {
+    this(resource, localPath, user, conf, scmClient,
+        FileSystem.get(conf), localPath.getFileSystem(conf));
+  }
+
+  /**
+   * @param resource the local resource that contains the original remote path
+   * @param localPath the path in the local filesystem where the resource is
+   * localized
+   * @param fs the filesystem of the shared cache
+   * @param localFs the local filesystem
+   */
+  public SharedCacheUploader(LocalResource resource, Path localPath,
+      String user, Configuration conf, SCMUploaderProtocol scmClient,
+      FileSystem fs, FileSystem localFs) {
+    this.resource = resource;
+    this.localPath = localPath;
+    this.user = user;
+    this.conf = conf;
+    this.scmClient = scmClient;
+    this.fs = fs;
+    this.sharedCacheRootDir =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+    this.nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+    this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
+    this.localFs = localFs;
+    this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  }
+
+  /**
+   * Uploads the file under the shared cache, and notifies the shared cache
+   * manager. If it is unable to upload the file because it already exists, it
+   * returns false.
+   */
+  @Override
+  public Boolean call() throws Exception {
+    Path tempPath = null;
+    try {
+      if (!verifyAccess()) {
+        LOG.warn("User " + user + " is not authorized to upload file " +
+            localPath.getName());
+        return false;
+      }
+
+      // first determine the actual local path that will be used for upload
+      Path actualPath = getActualPath();
+      // compute the checksum
+      String checksumVal = computeChecksum(actualPath);
+      // create the directory (if it doesn't exist)
+      Path directoryPath =
+          new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel,
+              sharedCacheRootDir, checksumVal));
+      // let's not check if the directory already exists: in the vast majority
+      // of the cases, the directory does not exist; as long as mkdirs does not
+      // error out if it exists, we should be fine
+      fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
+      // create the temporary file
+      tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
+      if (!uploadFile(actualPath, tempPath)) {
+        LOG.warn("Could not copy the file to the shared cache at " + tempPath);
+        return false;
+      }
+
+      // set the permission so that it is readable but not writable
+      fs.setPermission(tempPath, FILE_PERMISSION);
+      // rename it to the final filename
+      Path finalPath = new Path(directoryPath, actualPath.getName());
+      if (!fs.rename(tempPath, finalPath)) {
+        LOG.warn("The file already exists under " + finalPath +
+            ". Ignoring this attempt.");
+        deleteTempFile(tempPath);
+        return false;
+      }
+
+      // notify the SCM
+      if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
+        // the shared cache manager rejected the upload (as it is likely
+        // uploaded under a different name
+        // clean up this file and exit
+        fs.delete(finalPath, false);
+        return false;
+      }
+
+      // set the replication factor
+      short replication =
+          (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
+      fs.setReplication(finalPath, replication);
+      LOG.info("File " + actualPath.getName() +
+          " was uploaded to the shared cache at " + finalPath);
+      return true;
+    } catch (IOException e) {
+      LOG.warn("Exception while uploading the file " + localPath.getName(), e);
+      // in case an exception is thrown, delete the temp file
+      deleteTempFile(tempPath);
+      throw e;
+    }
+  }
+
+  @VisibleForTesting
+  Path getActualPath() throws IOException {
+    Path path = localPath;
+    FileStatus status = localFs.getFileStatus(path);
+    if (status != null && status.isDirectory()) {
+      // for certain types of resources that get unpacked, the original file may
+      // be found under the directory with the same name (see
+      // FSDownload.unpack); check if the path is a directory and if so look
+      // under it
+      path = new Path(path, path.getName());
+    }
+    return path;
+  }
+
+  private void deleteTempFile(Path tempPath) {
+    try {
+      if (tempPath != null && fs.exists(tempPath)) {
+        fs.delete(tempPath, false);
+      }
+    } catch (IOException ignore) {}
+  }
+
+  /**
+   * Checks that the (original) remote file is either owned by the user who
+   * started the app or public.
+   */
+  @VisibleForTesting
+  boolean verifyAccess() throws IOException {
+    // if it is in the public cache, it's trivially OK
+    if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
+      return true;
+    }
+
+    final Path remotePath;
+    try {
+      remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+    } catch (URISyntaxException e) {
+      throw new IOException("Invalid resource", e);
+    }
+
+    // get the file status of the HDFS file
+    FileSystem remoteFs = remotePath.getFileSystem(conf);
+    FileStatus status = remoteFs.getFileStatus(remotePath);
+    // check to see if the file has been modified in any way
+    if (status.getModificationTime() != resource.getTimestamp()) {
+      LOG.warn("The remote file " + remotePath +
+          " has changed since it's localized; will not consider it for upload");
+      return false;
+    }
+
+    // check for the user ownership
+    if (status.getOwner().equals(user)) {
+      return true; // the user owns the file
+    }
+    // check if the file is publicly readable otherwise
+    return fileIsPublic(remotePath, remoteFs, status);
+  }
+
+  @VisibleForTesting
+  boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
+      FileStatus status) throws IOException {
+    return FSDownload.isPublic(remoteFs, remotePath, status, null);
+  }
+
+  /**
+   * Uploads the file to the shared cache under a temporary name, and returns
+   * the result.
+   */
+  @VisibleForTesting
+  boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
+    return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
+  }
+
+  @VisibleForTesting
+  String computeChecksum(Path path) throws IOException {
+    InputStream is = localFs.open(path);
+    try {
+      return checksum.computeChecksum(is);
+    } finally {
+      try { is.close(); } catch (IOException ignore) {}
+    }
+  }
+
+  private String getTemporaryFileName(Path path) {
+    return path.getName() + "-" + randomTl.get().nextLong();
+  }
+
+  @VisibleForTesting
+  boolean notifySharedCacheManager(String checksumVal, String fileName)
+      throws IOException {
+    try {
+      SCMUploaderNotifyRequest request =
+          recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+      request.setResourceKey(checksumVal);
+      request.setFilename(fileName);
+      return scmClient.notify(request).getAccepted();
+    } catch (YarnException e) {
+      throw new IOException(e);
+    } catch (UndeclaredThrowableException e) {
+      // retrieve the cause of the exception and throw it as an IOException
+      throw new IOException(e.getCause() == null ? e : e.getCause());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 8f7fa78..c28d691 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -642,7 +642,7 @@ public class TestContainer {
     URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
     LocalResource rsrc =
         BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
-            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
     return new SimpleEntry<String, LocalResource>(name, rsrc);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index bf36651..1051e7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -1760,7 +1760,7 @@ public class TestResourceLocalizationService {
     URL url = getPath("/local/PRIVATE/" + name);
     LocalResource rsrc =
         BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
-            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
     return rsrc;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
new file mode 100644
index 0000000..1b2b2f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestSharedCacheUploadService {
+
+  @Test
+  public void testInitDisabled() {
+    testInit(false);
+  }
+
+  @Test
+  public void testInitEnabled() {
+    testInit(true);
+  }
+
+  public void testInit(boolean enabled) {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled);
+
+    SharedCacheUploadService service = new SharedCacheUploadService();
+    service.init(conf);
+    assertSame(enabled, service.isEnabled());
+
+    service.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0414303/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java
new file mode 100644
index 0000000..9234c62
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java
@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.junit.Test;
+
+public class TestSharedCacheUploader {
+
+  /**
+   * If verifyAccess fails, the upload should fail
+   */
+  @Test
+  public void testFailVerifyAccess() throws Exception {
+    SharedCacheUploader spied = createSpiedUploader();
+    doReturn(false).when(spied).verifyAccess();
+
+    assertFalse(spied.call());
+  }
+
+  /**
+   * If rename fails, the upload should fail
+   */
+  @Test
+  public void testRenameFail() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
+    when(response.getAccepted()).thenReturn(true);
+    when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
+        thenReturn(response);
+    FileSystem fs = mock(FileSystem.class);
+    // return false when rename is called
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+    // stub verifyAccess() to return true
+    doReturn(true).when(spied).verifyAccess();
+    // stub getActualPath()
+    doReturn(localPath).when(spied).getActualPath();
+    // stub computeChecksum()
+    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+    // stub uploadFile() to return true
+    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+
+    assertFalse(spied.call());
+  }
+
+  /**
+   * If verifyAccess, uploadFile, rename, and notification succeed, the upload
+   * should succeed
+   */
+  @Test
+  public void testSuccess() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
+    when(response.getAccepted()).thenReturn(true);
+    when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
+        thenReturn(response);
+    FileSystem fs = mock(FileSystem.class);
+    // return false when rename is called
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+    // stub verifyAccess() to return true
+    doReturn(true).when(spied).verifyAccess();
+    // stub getActualPath()
+    doReturn(localPath).when(spied).getActualPath();
+    // stub computeChecksum()
+    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+    // stub uploadFile() to return true
+    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+    // stub notifySharedCacheManager to return true
+    doReturn(true).when(spied).notifySharedCacheManager(isA(String.class),
+        isA(String.class));
+
+    assertTrue(spied.call());
+  }
+
+  /**
+   * If verifyAccess, uploadFile, and rename succed, but it receives a nay from
+   * SCM, the file should be deleted
+   */
+  @Test
+  public void testNotifySCMFail() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    FileSystem fs = mock(FileSystem.class);
+    // return false when rename is called
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, null, fs,
+            localFs);
+    // stub verifyAccess() to return true
+    doReturn(true).when(spied).verifyAccess();
+    // stub getActualPath()
+    doReturn(localPath).when(spied).getActualPath();
+    // stub computeChecksum()
+    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+    // stub uploadFile() to return true
+    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+    // stub notifySharedCacheManager to return true
+    doReturn(false).when(spied).notifySharedCacheManager(isA(String.class),
+        isA(String.class));
+
+    assertFalse(spied.call());
+    verify(fs).delete(isA(Path.class), anyBoolean());
+  }
+
+  /**
+   * If resource is public, verifyAccess should succeed
+   */
+  @Test
+  public void testVerifyAccessPublicResource() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    // give public visibility
+    when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+    Path localPath = mock(Path.class);
+    when(localPath.getName()).thenReturn("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    FileSystem fs = mock(FileSystem.class);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+
+    assertTrue(spied.verifyAccess());
+  }
+
+  /**
+   * If the localPath does not exists, getActualPath should get to one level
+   * down
+   */
+  @Test
+  public void testGetActualPath() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    // give public visibility
+    when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+    Path localPath = new Path("foo.jar");
+    String user = "joe";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    FileSystem fs = mock(FileSystem.class);
+    FileSystem localFs = mock(FileSystem.class);
+    // stub it to return a status that indicates a directory
+    FileStatus status = mock(FileStatus.class);
+    when(status.isDirectory()).thenReturn(true);
+    when(localFs.getFileStatus(localPath)).thenReturn(status);
+    SharedCacheUploader spied =
+        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+            localFs);
+
+    Path actualPath = spied.getActualPath();
+    assertEquals(actualPath.getName(), localPath.getName());
+    assertEquals(actualPath.getParent().getName(), localPath.getName());
+  }
+
+  private SharedCacheUploader createSpiedUploader() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+    LocalResource resource = mock(LocalResource.class);
+    Path localPath = mock(Path.class);
+    String user = "foo";
+    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    return createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+        localFs);
+  }
+
+  private SharedCacheUploader createSpiedUploader(LocalResource resource, Path localPath,
+      String user, Configuration conf, SCMUploaderProtocol scmClient,
+      FileSystem fs, FileSystem localFs)
+          throws IOException {
+    SharedCacheUploader uploader = new SharedCacheUploader(resource, localPath, user, conf, scmClient,
+        fs, localFs);
+    return spy(uploader);
+  }
+}