You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/10/31 21:01:03 UTC

git commit: YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk f1a149e91 -> 256697acd


YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/256697ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/256697ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/256697ac

Branch: refs/heads/trunk
Commit: 256697acd5ec16bca022ae86e22f9882b3309d8b
Parents: f1a149e
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Oct 31 13:00:42 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Oct 31 13:00:42 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  16 +-
 .../src/main/resources/yarn-default.xml         |  14 ++
 .../hadoop-yarn-server-common/pom.xml           |   1 +
 .../yarn/server/api/SCMUploaderProtocol.java    |  83 ++++++++
 .../yarn/server/api/SCMUploaderProtocolPB.java  |  28 +++
 .../client/SCMUploaderProtocolPBClientImpl.java |  93 +++++++++
 .../SCMUploaderProtocolPBServiceImpl.java       |  79 ++++++++
 .../SCMUploaderCanUploadRequest.java            |  49 +++++
 .../SCMUploaderCanUploadResponse.java           |  52 +++++
 .../SCMUploaderNotifyRequest.java               |  67 +++++++
 .../SCMUploaderNotifyResponse.java              |  51 +++++
 .../pb/SCMUploaderCanUploadRequestPBImpl.java   |  78 ++++++++
 .../pb/SCMUploaderCanUploadResponsePBImpl.java  |  75 ++++++++
 .../impl/pb/SCMUploaderNotifyRequestPBImpl.java |  93 +++++++++
 .../pb/SCMUploaderNotifyResponsePBImpl.java     |  73 +++++++
 .../src/main/proto/SCMUploader.proto            |  30 +++
 .../yarn_server_common_service_protos.proto     |  19 +-
 .../sharedcachemanager/SharedCacheManager.java  |   9 +
 .../SharedCacheUploaderService.java             | 140 ++++++++++++++
 .../metrics/SharedCacheUploaderMetrics.java     | 105 +++++++++++
 .../TestSharedCacheUploaderService.java         | 188 +++++++++++++++++++
 22 files changed, 1344 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 011083c..922b6fa 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -42,6 +42,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2183. [YARN-1492] Cleaner service for cache manager.
     (Chris Trezzo and Sangjin Lee via kasha)
 
+    YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. 
+    (Chris Trezzo and Sangjin Lee via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d8ed541..b459ee3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
@@ -1446,6 +1445,21 @@ public class YarnConfiguration extends Configuration {
       SCM_CLEANER_PREFIX + "resource-sleep-ms";
   public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
 
+  /** The address of the node manager interface in the SCM. */
+  public static final String SCM_UPLOADER_SERVER_ADDRESS = SHARED_CACHE_PREFIX
+      + "uploader.server.address";
+  public static final int DEFAULT_SCM_UPLOADER_SERVER_PORT = 8046;
+  public static final String DEFAULT_SCM_UPLOADER_SERVER_ADDRESS = "0.0.0.0:"
+      + DEFAULT_SCM_UPLOADER_SERVER_PORT;
+
+  /**
+   * The number of SCM threads used to handle notify requests from the node
+   * manager.
+   */
+  public static final String SCM_UPLOADER_SERVER_THREAD_COUNT =
+      SHARED_CACHE_PREFIX + "uploader.server.thread-count";
+  public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d02931f..1e7d544 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1434,6 +1434,20 @@
     <value>0</value>
   </property>
 
+  <property>
+    <description>The address of the node manager interface in the SCM
+    (shared cache manager)</description>
+    <name>yarn.sharedcache.uploader.server.address</name>
+    <value>0.0.0.0:8046</value>
+  </property>
+
+  <property>
+    <description>The number of threads used to handle shared cache manager
+    requests from the node manager (50 by default)</description>
+    <name>yarn.sharedcache.uploader.server.thread-count</name>
+    <value>50</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index acf330f..35eacc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -135,6 +135,7 @@
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>ResourceTracker.proto</include>
+                  <include>SCMUploader.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
new file mode 100644
index 0000000..937f648
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+
+/**
+ * <p>
+ * The protocol between a <code>NodeManager's</code>
+ * <code>SharedCacheUploadService</code> and the
+ * <code>SharedCacheManager.</code>
+ * </p>
+ */
+@Private
+@Unstable
+public interface SCMUploaderProtocol {
+  /**
+   * <p>
+   * The method used by the NodeManager's <code>SharedCacheUploadService</code>
+   * to notify the shared cache manager of a newly cached resource.
+   * </p>
+   *
+   * <p>
+   * The <code>SharedCacheManager</code> responds with whether or not the
+   * NodeManager should delete the uploaded file.
+   * </p>
+   *
+   * @param request notify the shared cache manager of a newly uploaded resource
+   *          to the shared cache
+   * @return response indicating if the newly uploaded resource should be
+   *         deleted
+   * @throws YarnException
+   * @throws IOException
+   */
+  public SCMUploaderNotifyResponse
+      notify(SCMUploaderNotifyRequest request)
+      throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The method used by the NodeManager's <code>SharedCacheUploadService</code>
+   * to request whether a resource can be uploaded.
+   * </p>
+   *
+   * <p>
+   * The <code>SharedCacheManager</code> responds with whether or not the
+   * NodeManager can upload the file.
+   * </p>
+   *
+   * @param request whether the resource can be uploaded to the shared cache
+   * @return response indicating if resource can be uploaded to the shared cache
+   * @throws YarnException
+   * @throws IOException
+   */
+  public SCMUploaderCanUploadResponse
+      canUpload(SCMUploaderCanUploadRequest request)
+      throws YarnException, IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
new file mode 100644
index 0000000..5099b7d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.SCMUploaderProtocol.SCMUploaderProtocolService;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB",
+    protocolVersion = 1)
+public interface SCMUploaderProtocolPB extends
+    SCMUploaderProtocolService.BlockingInterface {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
new file mode 100644
index 0000000..31e4868
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class SCMUploaderProtocolPBClientImpl implements
+    SCMUploaderProtocol, Closeable {
+
+  private SCMUploaderProtocolPB proxy;
+
+  public SCMUploaderProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, SCMUploaderProtocolPB.class,
+      ProtobufRpcEngine.class);
+    proxy =
+        RPC.getProxy(SCMUploaderProtocolPB.class, clientVersion, addr, conf);
+  }
+
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+      this.proxy = null;
+    }
+  }
+
+  @Override
+  public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request)
+      throws YarnException, IOException {
+    SCMUploaderNotifyRequestProto requestProto =
+        ((SCMUploaderNotifyRequestPBImpl) request).getProto();
+    try {
+      return new SCMUploaderNotifyResponsePBImpl(proxy.notify(null,
+          requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public SCMUploaderCanUploadResponse canUpload(
+      SCMUploaderCanUploadRequest request) throws YarnException, IOException {
+    SCMUploaderCanUploadRequestProto requestProto =
+        ((SCMUploaderCanUploadRequestPBImpl)request).getProto();
+    try {
+      return new SCMUploaderCanUploadResponsePBImpl(proxy.canUpload(null,
+          requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
new file mode 100644
index 0000000..db6c58c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class SCMUploaderProtocolPBServiceImpl implements
+    SCMUploaderProtocolPB {
+
+  private SCMUploaderProtocol real;
+
+  public SCMUploaderProtocolPBServiceImpl(SCMUploaderProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public SCMUploaderNotifyResponseProto notify(RpcController controller,
+      SCMUploaderNotifyRequestProto proto) throws ServiceException {
+    SCMUploaderNotifyRequestPBImpl request =
+        new SCMUploaderNotifyRequestPBImpl(proto);
+    try {
+      SCMUploaderNotifyResponse response = real.notify(request);
+      return ((SCMUploaderNotifyResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public SCMUploaderCanUploadResponseProto canUpload(RpcController controller,
+      SCMUploaderCanUploadRequestProto proto)
+      throws ServiceException {
+    SCMUploaderCanUploadRequestPBImpl request =
+        new SCMUploaderCanUploadRequestPBImpl(proto);
+    try {
+      SCMUploaderCanUploadResponse response = real.canUpload(request);
+      return ((SCMUploaderCanUploadResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
new file mode 100644
index 0000000..bb6718a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The request from the NodeManager to the <code>SharedCacheManager</code> that
+ * requests whether it can upload a resource in the shared cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderCanUploadRequest {
+
+  /**
+   * Get the <code>key</code> of the resource that would be uploaded to the
+   * shared cache.
+   *
+   * @return <code>key</code>
+   */
+  public abstract String getResourceKey();
+
+  /**
+   * Set the <code>key</code> of the resource that would be uploaded to the
+   * shared cache.
+   *
+   * @param key unique identifier for the resource
+   */
+  public abstract void setResourceKey(String key);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
new file mode 100644
index 0000000..5fb4626
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the NodeManager that indicates
+ * whether the NodeManager can upload the resource to the shared cache. If it is
+ * not accepted by SCM, the NodeManager should not upload it to the shared
+ * cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderCanUploadResponse {
+
+  /**
+   * Get whether or not the node manager can upload the resource to the shared
+   * cache.
+   *
+   * @return boolean True if the resource can be uploaded, false otherwise.
+   */
+  public abstract boolean getUploadable();
+
+  /**
+   * Set whether or not the node manager can upload the resource to the shared
+   * cache.
+   *
+   * @param b True if the resource can be uploaded, false otherwise.
+   */
+  public abstract void setUploadable(boolean b);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
new file mode 100644
index 0000000..c72453c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The request from the NodeManager to the <code>SharedCacheManager</code> that
+ * notifies that a resource has been uploaded to the shared cache. The
+ * <code>SharedCacheManager</code> may reject the resource for various reasons,
+ * in which case the NodeManager should remove it from the shared cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderNotifyRequest {
+
+  /**
+   * Get the filename of the resource that was just uploaded to the shared
+   * cache.
+   *
+   * @return the filename
+   */
+  public abstract String getFileName();
+
+  /**
+   * Set the filename of the resource that was just uploaded to the shared
+   * cache.
+   *
+   * @param filename the name of the file
+   */
+  public abstract void setFilename(String filename);
+
+  /**
+   * Get the <code>key</code> of the resource that was just uploaded to the
+   * shared cache.
+   *
+   * @return <code>key</code>
+   */
+  public abstract String getResourceKey();
+
+  /**
+   * Set the <code>key</code> of the resource that was just uploaded to the
+   * shared cache.
+   *
+   * @param key unique identifier for the resource
+   */
+  public abstract void setResourceKey(String key);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
new file mode 100644
index 0000000..83d7913
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the NodeManager that indicates
+ * whether the NodeManager needs to delete the cached resource it was sending
+ * the notification for.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderNotifyResponse {
+
+  /**
+   * Get whether or not the shared cache manager has accepted the notified
+   * resource (i.e. the uploaded file should remain in the cache).
+   *
+   * @return boolean True if the resource has been accepted, false otherwise.
+   */
+  public abstract boolean getAccepted();
+
+  /**
+   * Set whether or not the shared cache manager has accepted the notified
+   * resource (i.e. the uploaded file should remain in the cache).
+   *
+   * @param b True if the resource has been accepted, false otherwise.
+   */
+  public abstract void setAccepted(boolean b);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
new file mode 100644
index 0000000..d350fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+
+public class SCMUploaderCanUploadRequestPBImpl
+    extends SCMUploaderCanUploadRequest {
+  SCMUploaderCanUploadRequestProto proto =
+      SCMUploaderCanUploadRequestProto.getDefaultInstance();
+  SCMUploaderCanUploadRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderCanUploadRequestPBImpl() {
+    builder = SCMUploaderCanUploadRequestProto.newBuilder();
+  }
+
+  public SCMUploaderCanUploadRequestPBImpl(
+      SCMUploaderCanUploadRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderCanUploadRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public String getResourceKey() {
+    SCMUploaderCanUploadRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasResourceKey()) ? p.getResourceKey() : null;
+  }
+
+  @Override
+  public void setResourceKey(String key) {
+    maybeInitBuilder();
+    if (key == null) {
+      builder.clearResourceKey();
+      return;
+    }
+    builder.setResourceKey(key);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderCanUploadRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
new file mode 100644
index 0000000..3f44e2a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+
+public class SCMUploaderCanUploadResponsePBImpl
+    extends SCMUploaderCanUploadResponse {
+  SCMUploaderCanUploadResponseProto proto =
+      SCMUploaderCanUploadResponseProto.getDefaultInstance();
+  SCMUploaderCanUploadResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderCanUploadResponsePBImpl() {
+    builder = SCMUploaderCanUploadResponseProto.newBuilder();
+  }
+
+  public SCMUploaderCanUploadResponsePBImpl(
+      SCMUploaderCanUploadResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderCanUploadResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public boolean getUploadable() {
+    SCMUploaderCanUploadResponseProtoOrBuilder p = viaProto ? proto : builder;
+    // Default to true, when in doubt allow the upload
+    return (p.hasUploadable()) ? p.getUploadable() : true;
+  }
+
+  @Override
+  public void setUploadable(boolean b) {
+    maybeInitBuilder();
+    builder.setUploadable(b);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderCanUploadResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
new file mode 100644
index 0000000..9b52b11
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+
+public class SCMUploaderNotifyRequestPBImpl extends SCMUploaderNotifyRequest {
+  SCMUploaderNotifyRequestProto proto =
+      SCMUploaderNotifyRequestProto.getDefaultInstance();
+  SCMUploaderNotifyRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderNotifyRequestPBImpl() {
+    builder = SCMUploaderNotifyRequestProto.newBuilder();
+  }
+
+  public SCMUploaderNotifyRequestPBImpl(
+      SCMUploaderNotifyRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderNotifyRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public String getResourceKey() {
+    SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasResourceKey()) ? p.getResourceKey() : null;
+  }
+
+  @Override
+  public void setResourceKey(String key) {
+    maybeInitBuilder();
+    if (key == null) {
+      builder.clearResourceKey();
+      return;
+    }
+    builder.setResourceKey(key);
+  }
+
+  @Override
+  public String getFileName() {
+    SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasFilename()) ? p.getFilename() : null;
+  }
+
+  @Override
+  public void setFilename(String filename) {
+    maybeInitBuilder();
+    if (filename == null) {
+      builder.clearFilename();
+      return;
+    }
+    builder.setFilename(filename);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderNotifyRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
new file mode 100644
index 0000000..c899bbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+
+public class SCMUploaderNotifyResponsePBImpl extends SCMUploaderNotifyResponse {
+  SCMUploaderNotifyResponseProto proto =
+      SCMUploaderNotifyResponseProto.getDefaultInstance();
+  SCMUploaderNotifyResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public SCMUploaderNotifyResponsePBImpl() {
+    builder = SCMUploaderNotifyResponseProto.newBuilder();
+  }
+
+  public SCMUploaderNotifyResponsePBImpl(SCMUploaderNotifyResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SCMUploaderNotifyResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public boolean getAccepted() {
+    SCMUploaderNotifyResponseProtoOrBuilder p = viaProto ? proto : builder;
+    // Default to true, when in doubt just leave the file in the cache
+    return (p.hasAccepted()) ? p.getAccepted() : true;
+  }
+
+  @Override
+  public void setAccepted(boolean b) {
+    maybeInitBuilder();
+    builder.setAccepted(b);
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SCMUploaderNotifyResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
new file mode 100644
index 0000000..2278422
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "SCMUploaderProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service SCMUploaderProtocolService {
+  rpc notify(SCMUploaderNotifyRequestProto) returns (SCMUploaderNotifyResponseProto);
+  rpc canUpload(SCMUploaderCanUploadRequestProto) returns (SCMUploaderCanUploadResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index f2d01ad..91473c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -75,4 +75,21 @@ message NMContainerStatusProto {
   optional string diagnostics = 5 [default = "N/A"];
   optional int32 container_exit_status = 6;
   optional int64 creation_time = 7;
-}
\ No newline at end of file
+}
+
+message SCMUploaderNotifyRequestProto {
+  optional string resource_key = 1;
+  optional string filename = 2;
+}
+
+message SCMUploaderNotifyResponseProto {
+  optional bool accepted = 1;
+}
+
+message SCMUploaderCanUploadRequestProto {
+  optional string resource_key = 1;
+}
+
+message SCMUploaderCanUploadResponseProto {
+  optional bool uploadable = 1;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 3fdb588..ab50727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -67,6 +67,10 @@ public class SharedCacheManager extends CompositeService {
     CleanerService cs = createCleanerService(store);
     addService(cs);
 
+    SharedCacheUploaderService nms =
+        createNMCacheUploaderSCMProtocolService(store);
+    addService(nms);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -97,6 +101,11 @@ public class SharedCacheManager extends CompositeService {
     return new CleanerService(store);
   }
 
+  private SharedCacheUploaderService
+      createNMCacheUploaderSCMProtocolService(SCMStore store) {
+    return new SharedCacheUploaderService(store);
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
new file mode 100644
index 0000000..f949438
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+/**
+ * This service handles all rpc calls from the NodeManager uploader to the
+ * shared cache manager.
+ */
+public class SharedCacheUploaderService extends AbstractService
+    implements SCMUploaderProtocol {
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private Server server;
+  InetSocketAddress bindAddress;
+  private final SCMStore store;
+  private SharedCacheUploaderMetrics metrics;
+
+  public SharedCacheUploaderService(SCMStore store) {
+    super(SharedCacheUploaderService.class.getName());
+    this.store = store;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.bindAddress = getBindAddress(conf);
+
+    super.serviceInit(conf);
+  }
+
+  InetSocketAddress getBindAddress(Configuration conf) {
+    return conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    this.metrics = SharedCacheUploaderMetrics.initSingleton(conf);
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    this.server =
+        rpc.getServer(SCMUploaderProtocol.class, this, bindAddress,
+            conf, null, // Secret manager null for now (security not supported)
+            conf.getInt(YarnConfiguration.SCM_UPLOADER_SERVER_THREAD_COUNT,
+                YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT));
+
+    // TODO (YARN-2774): Enable service authorization
+
+    this.server.start();
+    bindAddress =
+        conf.updateConnectAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+            server.getListenerAddress());
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.server != null) {
+      this.server.stop();
+      this.server = null;
+    }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request)
+      throws YarnException, IOException {
+    SCMUploaderNotifyResponse response =
+        recordFactory.newRecordInstance(SCMUploaderNotifyResponse.class);
+
+    // TODO (YARN-2774): proper security/authorization needs to be implemented
+
+    String filename =
+        store.addResource(request.getResourceKey(), request.getFileName());
+
+    boolean accepted = filename.equals(request.getFileName());
+
+    if (accepted) {
+      this.metrics.incAcceptedUploads();
+    } else {
+      this.metrics.incRejectedUploads();
+    }
+
+    response.setAccepted(accepted);
+
+    return response;
+  }
+
+  @Override
+  public SCMUploaderCanUploadResponse canUpload(
+      SCMUploaderCanUploadRequest request) throws YarnException, IOException {
+    // TODO (YARN-2781): we may want to have a more flexible policy of
+    // instructing the node manager to upload only if it meets a certain
+    // criteria
+    // until then we return true for now
+    SCMUploaderCanUploadResponse response =
+        recordFactory.newRecordInstance(SCMUploaderCanUploadResponse.class);
+    response.setUploadable(true);
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
new file mode 100644
index 0000000..6fd816f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining shared cache uploader requests metrics
+ * and publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(about="shared cache upload metrics", context="yarn")
+public class SharedCacheUploaderMetrics {
+
+  static final Log LOG =
+      LogFactory.getLog(SharedCacheUploaderMetrics.class);
+  final MetricsRegistry registry;
+
+  SharedCacheUploaderMetrics() {
+    registry = new MetricsRegistry("SharedCacheUploaderRequests");
+    LOG.debug("Initialized "+ registry);
+  }
+
+  enum Singleton {
+    INSTANCE;
+
+    SharedCacheUploaderMetrics impl;
+
+    synchronized SharedCacheUploaderMetrics init(Configuration conf) {
+      if (impl == null) {
+        impl = create();
+      }
+      return impl;
+    }
+  }
+
+  public static SharedCacheUploaderMetrics
+      initSingleton(Configuration conf) {
+    return Singleton.INSTANCE.init(conf);
+  }
+
+  public static SharedCacheUploaderMetrics getInstance() {
+    SharedCacheUploaderMetrics topMetrics = Singleton.INSTANCE.impl;
+    if (topMetrics == null)
+      throw new IllegalStateException(
+          "The SharedCacheUploaderMetrics singleton instance is not"
+          + "initialized. Have you called init first?");
+    return topMetrics;
+  }
+
+  static SharedCacheUploaderMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+
+    SharedCacheUploaderMetrics metrics =
+        new SharedCacheUploaderMetrics();
+    ms.register("SharedCacheUploaderRequests", null, metrics);
+    return metrics;
+  }
+
+  @Metric("Number of accepted uploads") MutableCounterLong acceptedUploads;
+  @Metric("Number of rejected uploads") MutableCounterLong rejectedUploads;
+
+  /**
+   * One accepted upload event
+   */
+  public void incAcceptedUploads() {
+    acceptedUploads.incr();
+  }
+
+  /**
+   * One rejected upload event
+   */
+  public void incRejectedUploads() {
+    rejectedUploads.incr();
+  }
+
+  public long getAcceptedUploads() { return acceptedUploads.value(); }
+  public long getRejectUploads() { return rejectedUploads.value(); }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java
new file mode 100644
index 0000000..1cb0663
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Basic unit tests for the NodeManger to SCM Protocol Service.
+ */
+public class TestSharedCacheUploaderService {
+  private static File testDir = null;
+
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testDir = new File("target",
+        TestSharedCacheUploaderService.class.getCanonicalName());
+    testDir.delete();
+    testDir.mkdirs();
+    testDir = testDir.getAbsoluteFile();
+  }
+
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testDir != null) {
+      testDir.delete();
+    }
+  }
+
+  private SharedCacheUploaderService service;
+  private SCMUploaderProtocol proxy;
+  private SCMStore store;
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  @Before
+  public void startUp() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.SCM_STORE_CLASS,
+        InMemorySCMStore.class.getName());
+    conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
+    AppChecker appChecker = mock(AppChecker.class);
+    store = new InMemorySCMStore(appChecker);
+    store.init(conf);
+    store.start();
+
+    service = new SharedCacheUploaderService(store);
+    service.init(conf);
+    service.start();
+
+    YarnRPC rpc = YarnRPC.create(new Configuration());
+
+    InetSocketAddress scmAddress =
+        conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+            YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+
+    proxy =
+        (SCMUploaderProtocol) rpc.getProxy(
+            SCMUploaderProtocol.class, scmAddress, conf);
+  }
+
+  @After
+  public void cleanUp() {
+    if (store != null) {
+      store.stop();
+    }
+
+    if (service != null) {
+      service.stop();
+    }
+
+    if (proxy != null) {
+      RPC.stopProxy(proxy);
+    }
+  }
+
+  @Test
+  public void testNotify_noEntry() throws Exception {
+    long accepted =
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads();
+
+    SCMUploaderNotifyRequest request =
+        recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+    request.setResourceKey("key1");
+    request.setFilename("foo.jar");
+    assertTrue(proxy.notify(request).getAccepted());
+    Collection<SharedCacheResourceReference> set =
+        store.getResourceReferences("key1");
+    assertNotNull(set);
+    assertEquals(0, set.size());
+
+    assertEquals(
+        "NM upload metrics aren't updated.", 1,
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() -
+            accepted);
+
+  }
+
+  @Test
+  public void testNotify_entryExists_differentName() throws Exception {
+
+    long rejected =
+        SharedCacheUploaderMetrics.getInstance().getRejectUploads();
+
+    store.addResource("key1", "foo.jar");
+    SCMUploaderNotifyRequest request =
+        recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+    request.setResourceKey("key1");
+    request.setFilename("foobar.jar");
+    assertFalse(proxy.notify(request).getAccepted());
+    Collection<SharedCacheResourceReference> set =
+        store.getResourceReferences("key1");
+    assertNotNull(set);
+    assertEquals(0, set.size());
+    assertEquals(
+        "NM upload metrics aren't updated.", 1,
+        SharedCacheUploaderMetrics.getInstance().getRejectUploads() -
+            rejected);
+
+  }
+
+  @Test
+  public void testNotify_entryExists_sameName() throws Exception {
+
+    long accepted =
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads();
+
+    store.addResource("key1", "foo.jar");
+    SCMUploaderNotifyRequest request =
+        recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+    request.setResourceKey("key1");
+    request.setFilename("foo.jar");
+    assertTrue(proxy.notify(request).getAccepted());
+    Collection<SharedCacheResourceReference> set =
+        store.getResourceReferences("key1");
+    assertNotNull(set);
+    assertEquals(0, set.size());
+    assertEquals(
+        "NM upload metrics aren't updated.", 1,
+        SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() -
+            accepted);
+
+  }
+}