You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/10/31 21:01:03 UTC
git commit: YARN-2186. [YARN-1492] Node Manager uploader service for
cache manager. (Chris Trezzo and Sangjin Lee via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk f1a149e91 -> 256697acd
YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/256697ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/256697ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/256697ac
Branch: refs/heads/trunk
Commit: 256697acd5ec16bca022ae86e22f9882b3309d8b
Parents: f1a149e
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Oct 31 13:00:42 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Oct 31 13:00:42 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 16 +-
.../src/main/resources/yarn-default.xml | 14 ++
.../hadoop-yarn-server-common/pom.xml | 1 +
.../yarn/server/api/SCMUploaderProtocol.java | 83 ++++++++
.../yarn/server/api/SCMUploaderProtocolPB.java | 28 +++
.../client/SCMUploaderProtocolPBClientImpl.java | 93 +++++++++
.../SCMUploaderProtocolPBServiceImpl.java | 79 ++++++++
.../SCMUploaderCanUploadRequest.java | 49 +++++
.../SCMUploaderCanUploadResponse.java | 52 +++++
.../SCMUploaderNotifyRequest.java | 67 +++++++
.../SCMUploaderNotifyResponse.java | 51 +++++
.../pb/SCMUploaderCanUploadRequestPBImpl.java | 78 ++++++++
.../pb/SCMUploaderCanUploadResponsePBImpl.java | 75 ++++++++
.../impl/pb/SCMUploaderNotifyRequestPBImpl.java | 93 +++++++++
.../pb/SCMUploaderNotifyResponsePBImpl.java | 73 +++++++
.../src/main/proto/SCMUploader.proto | 30 +++
.../yarn_server_common_service_protos.proto | 19 +-
.../sharedcachemanager/SharedCacheManager.java | 9 +
.../SharedCacheUploaderService.java | 140 ++++++++++++++
.../metrics/SharedCacheUploaderMetrics.java | 105 +++++++++++
.../TestSharedCacheUploaderService.java | 188 +++++++++++++++++++
22 files changed, 1344 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 011083c..922b6fa 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -42,6 +42,9 @@ Release 2.7.0 - UNRELEASED
YARN-2183. [YARN-1492] Cleaner service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
+ YARN-2186. [YARN-1492] Node Manager uploader service for cache manager.
+ (Chris Trezzo and Sangjin Lee via kasha)
+
IMPROVEMENTS
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d8ed541..b459ee3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -1446,6 +1445,21 @@ public class YarnConfiguration extends Configuration {
SCM_CLEANER_PREFIX + "resource-sleep-ms";
public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
+ /** The address of the node manager interface in the SCM. */
+ public static final String SCM_UPLOADER_SERVER_ADDRESS = SHARED_CACHE_PREFIX
+ + "uploader.server.address";
+ public static final int DEFAULT_SCM_UPLOADER_SERVER_PORT = 8046;
+ public static final String DEFAULT_SCM_UPLOADER_SERVER_ADDRESS = "0.0.0.0:"
+ + DEFAULT_SCM_UPLOADER_SERVER_PORT;
+
+ /**
+ * The number of SCM threads used to handle notify requests from the node
+ * manager.
+ */
+ public static final String SCM_UPLOADER_SERVER_THREAD_COUNT =
+ SHARED_CACHE_PREFIX + "uploader.server.thread-count";
+ public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d02931f..1e7d544 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1434,6 +1434,20 @@
<value>0</value>
</property>
+ <property>
+ <description>The address of the node manager interface in the SCM
+ (shared cache manager)</description>
+ <name>yarn.sharedcache.uploader.server.address</name>
+ <value>0.0.0.0:8046</value>
+ </property>
+
+ <property>
+ <description>The number of threads used to handle shared cache manager
+ requests from the node manager (50 by default)</description>
+ <name>yarn.sharedcache.uploader.server.thread-count</name>
+ <value>50</value>
+ </property>
+
<!-- Other configuration -->
<property>
<description>The interval that the yarn client library uses to poll the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index acf330f..35eacc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -135,6 +135,7 @@
<include>yarn_server_common_service_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>
<include>ResourceTracker.proto</include>
+ <include>SCMUploader.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
new file mode 100644
index 0000000..937f648
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocol.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+
+/**
+ * <p>
+ * The protocol between a <code>NodeManager's</code>
+ * <code>SharedCacheUploadService</code> and the
+ * <code>SharedCacheManager.</code>
+ * </p>
+ */
+@Private
+@Unstable
+public interface SCMUploaderProtocol {
+ /**
+ * <p>
+ * The method used by the NodeManager's <code>SharedCacheUploadService</code>
+ * to notify the shared cache manager of a newly cached resource.
+ * </p>
+ *
+ * <p>
+ * The <code>SharedCacheManager</code> responds with whether or not the
+ * NodeManager should delete the uploaded file.
+ * </p>
+ *
+ * @param request notify the shared cache manager of a newly uploaded resource
+ * to the shared cache
+ * @return response indicating if the newly uploaded resource should be
+ * deleted
+ * @throws YarnException
+ * @throws IOException
+ */
+ public SCMUploaderNotifyResponse
+ notify(SCMUploaderNotifyRequest request)
+ throws YarnException, IOException;
+
+ /**
+ * <p>
+ * The method used by the NodeManager's <code>SharedCacheUploadService</code>
+ * to request whether a resource can be uploaded.
+ * </p>
+ *
+ * <p>
+ * The <code>SharedCacheManager</code> responds with whether or not the
+ * NodeManager can upload the file.
+ * </p>
+ *
+ * @param request whether the resource can be uploaded to the shared cache
+ * @return response indicating if resource can be uploaded to the shared cache
+ * @throws YarnException
+ * @throws IOException
+ */
+ public SCMUploaderCanUploadResponse
+ canUpload(SCMUploaderCanUploadRequest request)
+ throws YarnException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
new file mode 100644
index 0000000..5099b7d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/SCMUploaderProtocolPB.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.SCMUploaderProtocol.SCMUploaderProtocolService;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB",
+ protocolVersion = 1)
+public interface SCMUploaderProtocolPB extends
+ SCMUploaderProtocolService.BlockingInterface {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
new file mode 100644
index 0000000..31e4868
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMUploaderProtocolPBClientImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class SCMUploaderProtocolPBClientImpl implements
+ SCMUploaderProtocol, Closeable {
+
+ private SCMUploaderProtocolPB proxy;
+
+ public SCMUploaderProtocolPBClientImpl(long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, SCMUploaderProtocolPB.class,
+ ProtobufRpcEngine.class);
+ proxy =
+ RPC.getProxy(SCMUploaderProtocolPB.class, clientVersion, addr, conf);
+ }
+
+ @Override
+ public void close() {
+ if (this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ this.proxy = null;
+ }
+ }
+
+ @Override
+ public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request)
+ throws YarnException, IOException {
+ SCMUploaderNotifyRequestProto requestProto =
+ ((SCMUploaderNotifyRequestPBImpl) request).getProto();
+ try {
+ return new SCMUploaderNotifyResponsePBImpl(proxy.notify(null,
+ requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public SCMUploaderCanUploadResponse canUpload(
+ SCMUploaderCanUploadRequest request) throws YarnException, IOException {
+ SCMUploaderCanUploadRequestProto requestProto =
+ ((SCMUploaderCanUploadRequestPBImpl)request).getProto();
+ try {
+ return new SCMUploaderCanUploadResponsePBImpl(proxy.canUpload(null,
+ requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
new file mode 100644
index 0000000..db6c58c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMUploaderProtocolPBServiceImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderCanUploadResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SCMUploaderNotifyResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class SCMUploaderProtocolPBServiceImpl implements
+ SCMUploaderProtocolPB {
+
+ private SCMUploaderProtocol real;
+
+ public SCMUploaderProtocolPBServiceImpl(SCMUploaderProtocol impl) {
+ this.real = impl;
+ }
+
+ @Override
+ public SCMUploaderNotifyResponseProto notify(RpcController controller,
+ SCMUploaderNotifyRequestProto proto) throws ServiceException {
+ SCMUploaderNotifyRequestPBImpl request =
+ new SCMUploaderNotifyRequestPBImpl(proto);
+ try {
+ SCMUploaderNotifyResponse response = real.notify(request);
+ return ((SCMUploaderNotifyResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public SCMUploaderCanUploadResponseProto canUpload(RpcController controller,
+ SCMUploaderCanUploadRequestProto proto)
+ throws ServiceException {
+ SCMUploaderCanUploadRequestPBImpl request =
+ new SCMUploaderCanUploadRequestPBImpl(proto);
+ try {
+ SCMUploaderCanUploadResponse response = real.canUpload(request);
+ return ((SCMUploaderCanUploadResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
new file mode 100644
index 0000000..bb6718a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadRequest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The request from the NodeManager to the <code>SharedCacheManager</code> that
+ * requests whether it can upload a resource in the shared cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderCanUploadRequest {
+
+ /**
+ * Get the <code>key</code> of the resource that would be uploaded to the
+ * shared cache.
+ *
+ * @return <code>key</code>
+ */
+ public abstract String getResourceKey();
+
+ /**
+ * Set the <code>key</code> of the resource that would be uploaded to the
+ * shared cache.
+ *
+ * @param key unique identifier for the resource
+ */
+ public abstract void setResourceKey(String key);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
new file mode 100644
index 0000000..5fb4626
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderCanUploadResponse.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the NodeManager that indicates
+ * whether the NodeManager can upload the resource to the shared cache. If it is
+ * not accepted by SCM, the NodeManager should not upload it to the shared
+ * cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderCanUploadResponse {
+
+ /**
+ * Get whether or not the node manager can upload the resource to the shared
+ * cache.
+ *
+ * @return boolean True if the resource can be uploaded, false otherwise.
+ */
+ public abstract boolean getUploadable();
+
+ /**
+ * Set whether or not the node manager can upload the resource to the shared
+ * cache.
+ *
+ * @param b True if the resource can be uploaded, false otherwise.
+ */
+ public abstract void setUploadable(boolean b);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
new file mode 100644
index 0000000..c72453c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyRequest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The request from the NodeManager to the <code>SharedCacheManager</code> that
+ * notifies that a resource has been uploaded to the shared cache. The
+ * <code>SharedCacheManager</code> may reject the resource for various reasons,
+ * in which case the NodeManager should remove it from the shared cache.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderNotifyRequest {
+
+ /**
+ * Get the filename of the resource that was just uploaded to the shared
+ * cache.
+ *
+ * @return the filename
+ */
+ public abstract String getFileName();
+
+ /**
+ * Set the filename of the resource that was just uploaded to the shared
+ * cache.
+ *
+ * @param filename the name of the file
+ */
+ public abstract void setFilename(String filename);
+
+ /**
+ * Get the <code>key</code> of the resource that was just uploaded to the
+ * shared cache.
+ *
+ * @return <code>key</code>
+ */
+ public abstract String getResourceKey();
+
+ /**
+ * Set the <code>key</code> of the resource that was just uploaded to the
+ * shared cache.
+ *
+ * @param key unique identifier for the resource
+ */
+ public abstract void setResourceKey(String key);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
new file mode 100644
index 0000000..83d7913
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SCMUploaderNotifyResponse.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the NodeManager that indicates
+ * whether the NodeManager needs to delete the cached resource it was sending
+ * the notification for.
+ * </p>
+ */
+@Private
+@Unstable
+public abstract class SCMUploaderNotifyResponse {
+
+ /**
+ * Get whether or not the shared cache manager has accepted the notified
+ * resource (i.e. the uploaded file should remain in the cache).
+ *
+ * @return boolean True if the resource has been accepted, false otherwise.
+ */
+ public abstract boolean getAccepted();
+
+ /**
+ * Set whether or not the shared cache manager has accepted the notified
+ * resource (i.e. the uploaded file should remain in the cache).
+ *
+ * @param b True if the resource has been accepted, false otherwise.
+ */
+ public abstract void setAccepted(boolean b);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
new file mode 100644
index 0000000..d350fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadRequestPBImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+
+public class SCMUploaderCanUploadRequestPBImpl
+ extends SCMUploaderCanUploadRequest {
+ SCMUploaderCanUploadRequestProto proto =
+ SCMUploaderCanUploadRequestProto.getDefaultInstance();
+ SCMUploaderCanUploadRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SCMUploaderCanUploadRequestPBImpl() {
+ builder = SCMUploaderCanUploadRequestProto.newBuilder();
+ }
+
+ public SCMUploaderCanUploadRequestPBImpl(
+ SCMUploaderCanUploadRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SCMUploaderCanUploadRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public String getResourceKey() {
+ SCMUploaderCanUploadRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasResourceKey()) ? p.getResourceKey() : null;
+ }
+
+ @Override
+ public void setResourceKey(String key) {
+ maybeInitBuilder();
+ if (key == null) {
+ builder.clearResourceKey();
+ return;
+ }
+ builder.setResourceKey(key);
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SCMUploaderCanUploadRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
new file mode 100644
index 0000000..3f44e2a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderCanUploadResponsePBImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderCanUploadResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+
+public class SCMUploaderCanUploadResponsePBImpl
+ extends SCMUploaderCanUploadResponse {
+ SCMUploaderCanUploadResponseProto proto =
+ SCMUploaderCanUploadResponseProto.getDefaultInstance();
+ SCMUploaderCanUploadResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SCMUploaderCanUploadResponsePBImpl() {
+ builder = SCMUploaderCanUploadResponseProto.newBuilder();
+ }
+
+ public SCMUploaderCanUploadResponsePBImpl(
+ SCMUploaderCanUploadResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SCMUploaderCanUploadResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public boolean getUploadable() {
+ SCMUploaderCanUploadResponseProtoOrBuilder p = viaProto ? proto : builder;
+ // Default to true, when in doubt allow the upload
+ return (p.hasUploadable()) ? p.getUploadable() : true;
+ }
+
+ @Override
+ public void setUploadable(boolean b) {
+ maybeInitBuilder();
+ builder.setUploadable(b);
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SCMUploaderCanUploadResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
new file mode 100644
index 0000000..9b52b11
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyRequestPBImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+
+public class SCMUploaderNotifyRequestPBImpl extends SCMUploaderNotifyRequest {
+ SCMUploaderNotifyRequestProto proto =
+ SCMUploaderNotifyRequestProto.getDefaultInstance();
+ SCMUploaderNotifyRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SCMUploaderNotifyRequestPBImpl() {
+ builder = SCMUploaderNotifyRequestProto.newBuilder();
+ }
+
+ public SCMUploaderNotifyRequestPBImpl(
+ SCMUploaderNotifyRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SCMUploaderNotifyRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public String getResourceKey() {
+ SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasResourceKey()) ? p.getResourceKey() : null;
+ }
+
+ @Override
+ public void setResourceKey(String key) {
+ maybeInitBuilder();
+ if (key == null) {
+ builder.clearResourceKey();
+ return;
+ }
+ builder.setResourceKey(key);
+ }
+
+ @Override
+ public String getFileName() {
+ SCMUploaderNotifyRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasFilename()) ? p.getFilename() : null;
+ }
+
+ @Override
+ public void setFilename(String filename) {
+ maybeInitBuilder();
+ if (filename == null) {
+ builder.clearFilename();
+ return;
+ }
+ builder.setFilename(filename);
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SCMUploaderNotifyRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
new file mode 100644
index 0000000..c899bbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SCMUploaderNotifyResponsePBImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SCMUploaderNotifyResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+
+public class SCMUploaderNotifyResponsePBImpl extends SCMUploaderNotifyResponse {
+ SCMUploaderNotifyResponseProto proto =
+ SCMUploaderNotifyResponseProto.getDefaultInstance();
+ SCMUploaderNotifyResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SCMUploaderNotifyResponsePBImpl() {
+ builder = SCMUploaderNotifyResponseProto.newBuilder();
+ }
+
+ public SCMUploaderNotifyResponsePBImpl(SCMUploaderNotifyResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SCMUploaderNotifyResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public boolean getAccepted() {
+ SCMUploaderNotifyResponseProtoOrBuilder p = viaProto ? proto : builder;
+ // Default to true, when in doubt just leave the file in the cache
+ return (p.hasAccepted()) ? p.getAccepted() : true;
+ }
+
+ @Override
+ public void setAccepted(boolean b) {
+ maybeInitBuilder();
+ builder.setAccepted(b);
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SCMUploaderNotifyResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
new file mode 100644
index 0000000..2278422
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/SCMUploader.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "SCMUploaderProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service SCMUploaderProtocolService {
+ rpc notify(SCMUploaderNotifyRequestProto) returns (SCMUploaderNotifyResponseProto);
+ rpc canUpload(SCMUploaderCanUploadRequestProto) returns (SCMUploaderCanUploadResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index f2d01ad..91473c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -75,4 +75,21 @@ message NMContainerStatusProto {
optional string diagnostics = 5 [default = "N/A"];
optional int32 container_exit_status = 6;
optional int64 creation_time = 7;
-}
\ No newline at end of file
+}
+
+message SCMUploaderNotifyRequestProto {
+ optional string resource_key = 1;
+ optional string filename = 2;
+}
+
+message SCMUploaderNotifyResponseProto {
+ optional bool accepted = 1;
+}
+
+message SCMUploaderCanUploadRequestProto {
+ optional string resource_key = 1;
+}
+
+message SCMUploaderCanUploadResponseProto {
+ optional bool uploadable = 1;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 3fdb588..ab50727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -67,6 +67,10 @@ public class SharedCacheManager extends CompositeService {
CleanerService cs = createCleanerService(store);
addService(cs);
+ SharedCacheUploaderService nms =
+ createNMCacheUploaderSCMProtocolService(store);
+ addService(nms);
+
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -97,6 +101,11 @@ public class SharedCacheManager extends CompositeService {
return new CleanerService(store);
}
+ private SharedCacheUploaderService
+ createNMCacheUploaderSCMProtocolService(SCMStore store) {
+ return new SharedCacheUploaderService(store);
+ }
+
@Override
protected void serviceStop() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
new file mode 100644
index 0000000..f949438
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheUploaderService.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderCanUploadResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+/**
+ * This service handles all rpc calls from the NodeManager uploader to the
+ * shared cache manager.
+ */
+public class SharedCacheUploaderService extends AbstractService
+ implements SCMUploaderProtocol {
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private Server server;
+ InetSocketAddress bindAddress;
+ private final SCMStore store;
+ private SharedCacheUploaderMetrics metrics;
+
+ public SharedCacheUploaderService(SCMStore store) {
+ super(SharedCacheUploaderService.class.getName());
+ this.store = store;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.bindAddress = getBindAddress(conf);
+
+ super.serviceInit(conf);
+ }
+
+ InetSocketAddress getBindAddress(Configuration conf) {
+ return conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+ this.metrics = SharedCacheUploaderMetrics.initSingleton(conf);
+
+ YarnRPC rpc = YarnRPC.create(conf);
+ this.server =
+ rpc.getServer(SCMUploaderProtocol.class, this, bindAddress,
+ conf, null, // Secret manager null for now (security not supported)
+ conf.getInt(YarnConfiguration.SCM_UPLOADER_SERVER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT));
+
+ // TODO (YARN-2774): Enable service authorization
+
+ this.server.start();
+ bindAddress =
+ conf.updateConnectAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+ server.getListenerAddress());
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.server != null) {
+ this.server.stop();
+ this.server = null;
+ }
+
+ super.serviceStop();
+ }
+
+ @Override
+ public SCMUploaderNotifyResponse notify(SCMUploaderNotifyRequest request)
+ throws YarnException, IOException {
+ SCMUploaderNotifyResponse response =
+ recordFactory.newRecordInstance(SCMUploaderNotifyResponse.class);
+
+ // TODO (YARN-2774): proper security/authorization needs to be implemented
+
+ String filename =
+ store.addResource(request.getResourceKey(), request.getFileName());
+
+ boolean accepted = filename.equals(request.getFileName());
+
+ if (accepted) {
+ this.metrics.incAcceptedUploads();
+ } else {
+ this.metrics.incRejectedUploads();
+ }
+
+ response.setAccepted(accepted);
+
+ return response;
+ }
+
+ @Override
+ public SCMUploaderCanUploadResponse canUpload(
+ SCMUploaderCanUploadRequest request) throws YarnException, IOException {
+ // TODO (YARN-2781): we may want to have a more flexible policy of
+ // instructing the node manager to upload only if it meets a certain
+ // criteria
+ // until then we return true for now
+ SCMUploaderCanUploadResponse response =
+ recordFactory.newRecordInstance(SCMUploaderCanUploadResponse.class);
+ response.setUploadable(true);
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
new file mode 100644
index 0000000..6fd816f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/SharedCacheUploaderMetrics.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining shared cache uploader requests metrics
+ * and publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(about="shared cache upload metrics", context="yarn")
+public class SharedCacheUploaderMetrics {
+
+ static final Log LOG =
+ LogFactory.getLog(SharedCacheUploaderMetrics.class);
+ final MetricsRegistry registry;
+
+ SharedCacheUploaderMetrics() {
+ registry = new MetricsRegistry("SharedCacheUploaderRequests");
+ LOG.debug("Initialized "+ registry);
+ }
+
+ enum Singleton {
+ INSTANCE;
+
+ SharedCacheUploaderMetrics impl;
+
+ synchronized SharedCacheUploaderMetrics init(Configuration conf) {
+ if (impl == null) {
+ impl = create();
+ }
+ return impl;
+ }
+ }
+
+ public static SharedCacheUploaderMetrics
+ initSingleton(Configuration conf) {
+ return Singleton.INSTANCE.init(conf);
+ }
+
+ public static SharedCacheUploaderMetrics getInstance() {
+ SharedCacheUploaderMetrics topMetrics = Singleton.INSTANCE.impl;
+ if (topMetrics == null)
+ throw new IllegalStateException(
+ "The SharedCacheUploaderMetrics singleton instance is not"
+ + "initialized. Have you called init first?");
+ return topMetrics;
+ }
+
+ static SharedCacheUploaderMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+
+ SharedCacheUploaderMetrics metrics =
+ new SharedCacheUploaderMetrics();
+ ms.register("SharedCacheUploaderRequests", null, metrics);
+ return metrics;
+ }
+
+ @Metric("Number of accepted uploads") MutableCounterLong acceptedUploads;
+ @Metric("Number of rejected uploads") MutableCounterLong rejectedUploads;
+
+ /**
+ * One accepted upload event
+ */
+ public void incAcceptedUploads() {
+ acceptedUploads.incr();
+ }
+
+ /**
+ * One rejected upload event
+ */
+ public void incRejectedUploads() {
+ rejectedUploads.incr();
+ }
+
+ public long getAcceptedUploads() { return acceptedUploads.value(); }
+ public long getRejectUploads() { return rejectedUploads.value(); }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/256697ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java
new file mode 100644
index 0000000..1cb0663
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheUploaderService.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.SharedCacheUploaderMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Basic unit tests for the NodeManger to SCM Protocol Service.
+ */
+public class TestSharedCacheUploaderService {
+ private static File testDir = null;
+
+ @BeforeClass
+ public static void setupTestDirs() throws IOException {
+ testDir = new File("target",
+ TestSharedCacheUploaderService.class.getCanonicalName());
+ testDir.delete();
+ testDir.mkdirs();
+ testDir = testDir.getAbsoluteFile();
+ }
+
+ @AfterClass
+ public static void cleanupTestDirs() throws IOException {
+ if (testDir != null) {
+ testDir.delete();
+ }
+ }
+
+ private SharedCacheUploaderService service;
+ private SCMUploaderProtocol proxy;
+ private SCMStore store;
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ @Before
+ public void startUp() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.SCM_STORE_CLASS,
+ InMemorySCMStore.class.getName());
+ conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
+ AppChecker appChecker = mock(AppChecker.class);
+ store = new InMemorySCMStore(appChecker);
+ store.init(conf);
+ store.start();
+
+ service = new SharedCacheUploaderService(store);
+ service.init(conf);
+ service.start();
+
+ YarnRPC rpc = YarnRPC.create(new Configuration());
+
+ InetSocketAddress scmAddress =
+ conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
+
+ proxy =
+ (SCMUploaderProtocol) rpc.getProxy(
+ SCMUploaderProtocol.class, scmAddress, conf);
+ }
+
+ @After
+ public void cleanUp() {
+ if (store != null) {
+ store.stop();
+ }
+
+ if (service != null) {
+ service.stop();
+ }
+
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+
+ @Test
+ public void testNotify_noEntry() throws Exception {
+ long accepted =
+ SharedCacheUploaderMetrics.getInstance().getAcceptedUploads();
+
+ SCMUploaderNotifyRequest request =
+ recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+ request.setResourceKey("key1");
+ request.setFilename("foo.jar");
+ assertTrue(proxy.notify(request).getAccepted());
+ Collection<SharedCacheResourceReference> set =
+ store.getResourceReferences("key1");
+ assertNotNull(set);
+ assertEquals(0, set.size());
+
+ assertEquals(
+ "NM upload metrics aren't updated.", 1,
+ SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() -
+ accepted);
+
+ }
+
+ @Test
+ public void testNotify_entryExists_differentName() throws Exception {
+
+ long rejected =
+ SharedCacheUploaderMetrics.getInstance().getRejectUploads();
+
+ store.addResource("key1", "foo.jar");
+ SCMUploaderNotifyRequest request =
+ recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+ request.setResourceKey("key1");
+ request.setFilename("foobar.jar");
+ assertFalse(proxy.notify(request).getAccepted());
+ Collection<SharedCacheResourceReference> set =
+ store.getResourceReferences("key1");
+ assertNotNull(set);
+ assertEquals(0, set.size());
+ assertEquals(
+ "NM upload metrics aren't updated.", 1,
+ SharedCacheUploaderMetrics.getInstance().getRejectUploads() -
+ rejected);
+
+ }
+
+ @Test
+ public void testNotify_entryExists_sameName() throws Exception {
+
+ long accepted =
+ SharedCacheUploaderMetrics.getInstance().getAcceptedUploads();
+
+ store.addResource("key1", "foo.jar");
+ SCMUploaderNotifyRequest request =
+ recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
+ request.setResourceKey("key1");
+ request.setFilename("foo.jar");
+ assertTrue(proxy.notify(request).getAccepted());
+ Collection<SharedCacheResourceReference> set =
+ store.getResourceReferences("key1");
+ assertNotNull(set);
+ assertEquals(0, set.size());
+ assertEquals(
+ "NM upload metrics aren't updated.", 1,
+ SharedCacheUploaderMetrics.getInstance().getAcceptedUploads() -
+ accepted);
+
+ }
+}