You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/11/26 01:22:01 UTC
hadoop git commit: YARN-2188. [YARN-1492] Client service for cache
manager. (Chris Trezzo and Sangjin Lee via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk 8a7ca13b1 -> fe1f2db5e
YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe1f2db5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe1f2db5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe1f2db5
Branch: refs/heads/trunk
Commit: fe1f2db5ee13920925ee4728dfbbb48fe670ee14
Parents: 8a7ca13
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Nov 25 16:21:29 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Nov 25 16:21:46 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 5 +-
.../hadoop-yarn/hadoop-yarn-api/pom.xml | 1 +
.../hadoop/yarn/api/ClientSCMProtocol.java | 90 ++++++
.../hadoop/yarn/api/ClientSCMProtocolPB.java | 28 ++
.../ReleaseSharedCacheResourceRequest.java | 67 +++++
.../ReleaseSharedCacheResourceResponse.java | 37 +++
.../UseSharedCacheResourceRequest.java | 70 +++++
.../UseSharedCacheResourceResponse.java | 55 ++++
.../hadoop/yarn/conf/YarnConfiguration.java | 12 +
.../src/main/proto/client_SCM_protocol.proto | 30 ++
.../src/main/proto/yarn_service_protos.proto | 21 ++
.../client/ClientSCMProtocolPBClientImpl.java | 93 +++++++
.../service/ClientSCMProtocolPBServiceImpl.java | 78 ++++++
...ReleaseSharedCacheResourceRequestPBImpl.java | 122 ++++++++
...eleaseSharedCacheResourceResponsePBImpl.java | 53 ++++
.../pb/UseSharedCacheResourceRequestPBImpl.java | 120 ++++++++
.../UseSharedCacheResourceResponsePBImpl.java | 79 ++++++
.../src/main/resources/yarn-default.xml | 23 +-
.../ClientProtocolService.java | 192 +++++++++++++
.../sharedcachemanager/SharedCacheManager.java | 7 +
.../metrics/ClientSCMMetrics.java | 113 ++++++++
.../TestClientSCMProtocolService.java | 278 +++++++++++++++++++
22 files changed, 1570 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 095bfc0..e9fa63e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -46,7 +46,10 @@ Release 2.7.0 - UNRELEASED
(Chris Trezzo and Sangjin Lee via kasha)
YARN-2236. [YARN-1492] Shared Cache uploader service on the Node
- Manager. (Chris Trezzo and Sanjin Lee via kasha)
+ Manager. (Chris Trezzo and Sangjin Lee via kasha)
+
+ YARN-2188. [YARN-1492] Client service for cache manager.
+ (Chris Trezzo and Sangjin Lee via kasha)
IMPROVEMENTS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
index affbe03..5e2278d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
@@ -96,6 +96,7 @@
<include>server/resourcemanager_administration_protocol.proto</include>
<include>application_history_client.proto</include>
<include>server/application_history_server.proto</include>
+ <include>client_SCM_protocol.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java
new file mode 100644
index 0000000..d63fa11
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * <p>
+ * The protocol between clients and the <code>SharedCacheManager</code> to claim
+ * and release resources in the shared cache.
+ * </p>
+ */
+@Public
+@Unstable
+public interface ClientSCMProtocol {
+ /**
+ * <p>
+ * The interface used by clients to claim a resource with the
+ * <code>SharedCacheManager.</code> The client uses a checksum to identify the
+ * resource and an {@link ApplicationId} to identify which application will be
+ * using the resource.
+ * </p>
+ *
+ * <p>
+ * The <code>SharedCacheManager</code> responds with whether or not the
+ * resource exists in the cache. If the resource exists, a <code>Path</code>
+ * to the resource in the shared cache is returned. If the resource does not
+ * exist, the response is empty.
+ * </p>
+ *
+ * @param request request to claim a resource in the shared cache
+ * @return response indicating if the resource is already in the cache
+ * @throws YarnException
+ * @throws IOException
+ */
+ public UseSharedCacheResourceResponse use(
+ UseSharedCacheResourceRequest request) throws YarnException, IOException;
+
+ /**
+ * <p>
+ * The interface used by clients to release a resource with the
+ * <code>SharedCacheManager.</code> This method is called once an application
+ * is no longer using a claimed resource in the shared cache. The client uses
+ * a checksum to identify the resource and an {@link ApplicationId} to
+ * identify which application is releasing the resource.
+ * </p>
+ *
+ * <p>
+ * Note: This method is an optimization and the client is not required to call
+ * it for correctness.
+ * </p>
+ *
+ * <p>
+ * Currently the <code>SharedCacheManager</code> sends an empty response.
+ * </p>
+ *
+ * @param request request to release a resource in the shared cache
+ * @return (empty) response on releasing the resource
+ * @throws YarnException
+ * @throws IOException
+ */
+ public ReleaseSharedCacheResourceResponse release(
+ ReleaseSharedCacheResourceRequest request) throws YarnException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java
new file mode 100644
index 0000000..b0a9fb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.ClientSCMProtocol.ClientSCMProtocolService;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientSCMProtocolPB",
+ protocolVersion = 1)
+public interface ClientSCMProtocolPB extends
+ ClientSCMProtocolService.BlockingInterface {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java
new file mode 100644
index 0000000..a8d36b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * <p>The request from clients to release a resource in the shared cache.</p>
+ */
+@Public
+@Unstable
+public abstract class ReleaseSharedCacheResourceRequest {
+
+ /**
+ * Get the <code>ApplicationId</code> of the resource to be released.
+ *
+ * @return <code>ApplicationId</code>
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationId getAppId();
+
+ /**
+ * Set the <code>ApplicationId</code> of the resource to be released.
+ *
+ * @param id <code>ApplicationId</code>
+ */
+ @Public
+ @Unstable
+ public abstract void setAppId(ApplicationId id);
+
+ /**
+ * Get the <code>key</code> of the resource to be released.
+ *
+ * @return <code>key</code>
+ */
+ @Public
+ @Unstable
+ public abstract String getResourceKey();
+
+ /**
+ * Set the <code>key</code> of the resource to be released.
+ *
+ * @param key unique identifier for the resource
+ */
+ @Public
+ @Unstable
+ public abstract void setResourceKey(String key);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java
new file mode 100644
index 0000000..c075e74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response to clients from the <code>SharedCacheManager</code> when
+ * releasing a resource in the shared cache.
+ * </p>
+ *
+ * <p>
+ * Currently, this is empty.
+ * </p>
+ */
+@Public
+@Unstable
+public abstract class ReleaseSharedCacheResourceResponse {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java
new file mode 100644
index 0000000..bd42b7d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * <p>
+ * The request from clients to the <code>SharedCacheManager</code> that claims a
+ * resource in the shared cache.
+ * </p>
+ */
+@Public
+@Unstable
+public abstract class UseSharedCacheResourceRequest {
+
+ /**
+ * Get the <code>ApplicationId</code> of the resource to be used.
+ *
+ * @return <code>ApplicationId</code>
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationId getAppId();
+
+ /**
+ * Set the <code>ApplicationId</code> of the resource to be used.
+ *
+ * @param id <code>ApplicationId</code>
+ */
+ @Public
+ @Unstable
+ public abstract void setAppId(ApplicationId id);
+
+ /**
+ * Get the <code>key</code> of the resource to be used.
+ *
+ * @return <code>key</code>
+ */
+ @Public
+ @Unstable
+ public abstract String getResourceKey();
+
+ /**
+ * Set the <code>key</code> of the resource to be used.
+ *
+ * @param key unique identifier for the resource
+ */
+ @Public
+ @Unstable
+ public abstract void setResourceKey(String key);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java
new file mode 100644
index 0000000..87fb43b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * The response from the SharedCacheManager to the client that indicates whether
+ * a requested resource exists in the cache.
+ * </p>
+ */
+@Public
+@Unstable
+public abstract class UseSharedCacheResourceResponse {
+
+ /**
+ * Get the <code>Path</code> corresponding to the requested resource in the
+ * shared cache.
+ *
+ * @return String A <code>Path</code> if the resource exists in the shared
+ * cache, <code>null</code> otherwise
+ */
+ @Public
+ @Unstable
+ public abstract String getPath();
+
+ /**
+ * Set the <code>Path</code> corresponding to a resource in the shared cache.
+ *
+ * @param p A <code>Path</code> corresponding to a resource in the shared
+ * cache
+ */
+ @Public
+ @Unstable
+ public abstract void setPath(String p);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4b4f581..52bc821 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1472,6 +1472,18 @@ public class YarnConfiguration extends Configuration {
SHARED_CACHE_PREFIX + "uploader.server.thread-count";
public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
+ /** The address of the client interface in the SCM. */
+ public static final String SCM_CLIENT_SERVER_ADDRESS =
+ SHARED_CACHE_PREFIX + "client-server.address";
+ public static final int DEFAULT_SCM_CLIENT_SERVER_PORT = 8045;
+ public static final String DEFAULT_SCM_CLIENT_SERVER_ADDRESS = "0.0.0.0:"
+ + DEFAULT_SCM_CLIENT_SERVER_PORT;
+
+ /** The number of threads used to handle shared cache manager requests. */
+ public static final String SCM_CLIENT_SERVER_THREAD_COUNT =
+ SHARED_CACHE_PREFIX + "client-server.thread-count";
+ public static final int DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT = 50;
+
/** the checksum algorithm implementation **/
public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
SHARED_CACHE_PREFIX + "checksum.algo.impl";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto
new file mode 100644
index 0000000..fbc3c42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "ClientSCMProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_service_protos.proto";
+
+service ClientSCMProtocolService {
+ rpc use (UseSharedCacheResourceRequestProto) returns (UseSharedCacheResourceResponseProto);
+ rpc release (ReleaseSharedCacheResourceRequestProto) returns (ReleaseSharedCacheResourceResponseProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 1bde69a..10f5b9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -307,6 +307,27 @@ message GetContainersResponseProto {
}
//////////////////////////////////////////////////////
+/////// client_SCM_Protocol //////////////////////////
+//////////////////////////////////////////////////////
+
+message UseSharedCacheResourceRequestProto {
+ optional ApplicationIdProto applicationId = 1;
+ optional string resourceKey = 2;
+}
+
+message UseSharedCacheResourceResponseProto {
+ optional string path = 1;
+}
+
+message ReleaseSharedCacheResourceRequestProto {
+ optional ApplicationIdProto applicationId = 1;
+ optional string resourceKey = 2;
+}
+
+message ReleaseSharedCacheResourceResponseProto {
+}
+
+//////////////////////////////////////////////////////
// reservation_protocol
//////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java
new file mode 100644
index 0000000..79bfaca
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
+
+import com.google.protobuf.ServiceException;
+
+public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol,
+ Closeable {
+
+ private ClientSCMProtocolPB proxy;
+
+ public ClientSCMProtocolPBClientImpl(long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class,
+ ProtobufRpcEngine.class);
+ proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf);
+ }
+
+ @Override
+ public void close() {
+ if (this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ this.proxy = null;
+ }
+ }
+
+ @Override
+ public UseSharedCacheResourceResponse use(
+ UseSharedCacheResourceRequest request) throws YarnException, IOException {
+ UseSharedCacheResourceRequestProto requestProto =
+ ((UseSharedCacheResourceRequestPBImpl) request).getProto();
+ try {
+ return new UseSharedCacheResourceResponsePBImpl(proxy.use(null,
+ requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public ReleaseSharedCacheResourceResponse release(
+ ReleaseSharedCacheResourceRequest request) throws YarnException,
+ IOException {
+ ReleaseSharedCacheResourceRequestProto requestProto =
+ ((ReleaseSharedCacheResourceRequestPBImpl) request).getProto();
+ try {
+ return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null,
+ requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java
new file mode 100644
index 0000000..65b3581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class ClientSCMProtocolPBServiceImpl implements ClientSCMProtocolPB {
+
+ private ClientSCMProtocol real;
+
+ public ClientSCMProtocolPBServiceImpl(ClientSCMProtocol impl) {
+ this.real = impl;
+ }
+
+ @Override
+ public UseSharedCacheResourceResponseProto use(RpcController controller,
+ UseSharedCacheResourceRequestProto proto) throws ServiceException {
+ UseSharedCacheResourceRequestPBImpl request =
+ new UseSharedCacheResourceRequestPBImpl(proto);
+ try {
+ UseSharedCacheResourceResponse response = real.use(request);
+ return ((UseSharedCacheResourceResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public ReleaseSharedCacheResourceResponseProto release(
+ RpcController controller, ReleaseSharedCacheResourceRequestProto proto)
+ throws ServiceException {
+ ReleaseSharedCacheResourceRequestPBImpl request =
+ new ReleaseSharedCacheResourceRequestPBImpl(proto);
+ try {
+ ReleaseSharedCacheResourceResponse response = real.release(request);
+ return ((ReleaseSharedCacheResourceResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java
new file mode 100644
index 0000000..d16ef78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProtoOrBuilder;
+
+public class ReleaseSharedCacheResourceRequestPBImpl extends
+ ReleaseSharedCacheResourceRequest {
+ ReleaseSharedCacheResourceRequestProto proto =
+ ReleaseSharedCacheResourceRequestProto.getDefaultInstance();
+ ReleaseSharedCacheResourceRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId applicationId = null;
+
+ public ReleaseSharedCacheResourceRequestPBImpl() {
+ builder = ReleaseSharedCacheResourceRequestProto.newBuilder();
+ }
+
+ public ReleaseSharedCacheResourceRequestPBImpl(
+ ReleaseSharedCacheResourceRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReleaseSharedCacheResourceRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public ApplicationId getAppId() {
+ ReleaseSharedCacheResourceRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return this.applicationId;
+ }
+
+ @Override
+ public void setAppId(ApplicationId id) {
+ maybeInitBuilder();
+ if (id == null)
+ builder.clearApplicationId();
+ this.applicationId = id;
+ }
+
+ @Override
+ public String getResourceKey() {
+ ReleaseSharedCacheResourceRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return (p.hasResourceKey()) ? p.getResourceKey() : null;
+ }
+
+ @Override
+ public void setResourceKey(String key) {
+ maybeInitBuilder();
+ if (key == null) {
+ builder.clearResourceKey();
+ return;
+ }
+ builder.setResourceKey(key);
+ }
+
+ private void mergeLocalToBuilder() {
+ if (applicationId != null) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ReleaseSharedCacheResourceRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java
new file mode 100644
index 0000000..559f2c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto;
+
+public class ReleaseSharedCacheResourceResponsePBImpl extends
+ ReleaseSharedCacheResourceResponse {
+ ReleaseSharedCacheResourceResponseProto proto =
+ ReleaseSharedCacheResourceResponseProto.getDefaultInstance();
+ ReleaseSharedCacheResourceResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public ReleaseSharedCacheResourceResponsePBImpl() {
+ builder = ReleaseSharedCacheResourceResponseProto.newBuilder();
+ }
+
+ public ReleaseSharedCacheResourceResponsePBImpl(
+ ReleaseSharedCacheResourceResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReleaseSharedCacheResourceResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ReleaseSharedCacheResourceResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java
new file mode 100644
index 0000000..2a134b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProtoOrBuilder;
+
+public class UseSharedCacheResourceRequestPBImpl extends
+ UseSharedCacheResourceRequest {
+ UseSharedCacheResourceRequestProto proto = UseSharedCacheResourceRequestProto
+ .getDefaultInstance();
+ UseSharedCacheResourceRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId applicationId = null;
+
+ public UseSharedCacheResourceRequestPBImpl() {
+ builder = UseSharedCacheResourceRequestProto.newBuilder();
+ }
+
+ public UseSharedCacheResourceRequestPBImpl(
+ UseSharedCacheResourceRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public UseSharedCacheResourceRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public ApplicationId getAppId() {
+ UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return this.applicationId;
+ }
+
+ @Override
+ public void setAppId(ApplicationId id) {
+ maybeInitBuilder();
+ if (id == null)
+ builder.clearApplicationId();
+ this.applicationId = id;
+ }
+
+ @Override
+ public String getResourceKey() {
+ UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasResourceKey()) ? p.getResourceKey() : null;
+ }
+
+ @Override
+ public void setResourceKey(String key) {
+ maybeInitBuilder();
+ if (key == null) {
+ builder.clearResourceKey();
+ return;
+ }
+ builder.setResourceKey(key);
+ }
+
+ private void mergeLocalToBuilder() {
+ if (applicationId != null) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = UseSharedCacheResourceRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java
new file mode 100644
index 0000000..0dd44c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProtoOrBuilder;
+
+public class UseSharedCacheResourceResponsePBImpl extends
+ UseSharedCacheResourceResponse {
+ UseSharedCacheResourceResponseProto proto =
+ UseSharedCacheResourceResponseProto
+ .getDefaultInstance();
+ UseSharedCacheResourceResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public UseSharedCacheResourceResponsePBImpl() {
+ builder = UseSharedCacheResourceResponseProto.newBuilder();
+ }
+
+ public UseSharedCacheResourceResponsePBImpl(
+ UseSharedCacheResourceResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public UseSharedCacheResourceResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public String getPath() {
+ UseSharedCacheResourceResponseProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasPath()) ? p.getPath() : null;
+ }
+
+ @Override
+ public void setPath(String path) {
+ maybeInitBuilder();
+ if (path == null) {
+ builder.clearPath();
+ return;
+ }
+ builder.setPath(path);
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = UseSharedCacheResourceResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index af3b5aa..0ff989e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1459,19 +1459,36 @@
</property>
<property>
- <description>The algorithm used to compute checksums of files (SHA-256 by default)</description>
+ <description>The address of the client interface in the SCM
+ (shared cache manager)</description>
+ <name>yarn.sharedcache.client-server.address</name>
+ <value>0.0.0.0:8045</value>
+ </property>
+
+ <property>
+ <description>The number of threads used to handle shared cache manager
+ requests from clients (50 by default)</description>
+ <name>yarn.sharedcache.client-server.thread-count</name>
+ <value>50</value>
+ </property>
+
+ <property>
+ <description>The algorithm used to compute checksums of files (SHA-256 by
+ default)</description>
<name>yarn.sharedcache.checksum.algo.impl</name>
<value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value>
</property>
<property>
- <description>The replication factor for the node manager uploader for the shared cache (10 by default)</description>
+ <description>The replication factor for the node manager uploader for the
+ shared cache (10 by default)</description>
<name>yarn.sharedcache.nm.uploader.replication.factor</name>
<value>10</value>
</property>
<property>
- <description>The number of threads used to upload files from a node manager instance (20 by default)</description>
+ <description>The number of threads used to upload files from a node manager
+ instance (20 by default)</description>
<name>yarn.sharedcache.nm.uploader.thread-count</name>
<value>20</value>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java
new file mode 100644
index 0000000..bd13573
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+
+/**
+ * This service handles all rpc calls from the client to the shared cache
+ * manager.
+ */
+@Private
+@Evolving
+public class ClientProtocolService extends AbstractService implements
+ ClientSCMProtocol {
+
+ private static final Log LOG = LogFactory.getLog(ClientProtocolService.class);
+
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private Server server;
+ InetSocketAddress clientBindAddress;
+ private final SCMStore store;
+ private int cacheDepth;
+ private String cacheRoot;
+ private ClientSCMMetrics metrics;
+
+ public ClientProtocolService(SCMStore store) {
+ super(ClientProtocolService.class.getName());
+ this.store = store;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.clientBindAddress = getBindAddress(conf);
+
+ this.cacheDepth = SharedCacheUtil.getCacheDepth(conf);
+
+ this.cacheRoot =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+ super.serviceInit(conf);
+ }
+
+ InetSocketAddress getBindAddress(Configuration conf) {
+ return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+ this.metrics = ClientSCMMetrics.initSingleton(conf);
+
+ YarnRPC rpc = YarnRPC.create(conf);
+ this.server =
+ rpc.getServer(ClientSCMProtocol.class, this,
+ clientBindAddress,
+ conf, null, // Secret manager null for now (security not supported)
+ conf.getInt(YarnConfiguration.SCM_CLIENT_SERVER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT));
+
+ // TODO (YARN-2774): Enable service authorization
+
+ this.server.start();
+ clientBindAddress =
+ conf.updateConnectAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+ server.getListenerAddress());
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.server != null) {
+ this.server.stop();
+ }
+
+ super.serviceStop();
+ }
+
+ @Override
+ public UseSharedCacheResourceResponse use(
+ UseSharedCacheResourceRequest request) throws YarnException,
+ IOException {
+
+ UseSharedCacheResourceResponse response =
+ recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class);
+
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ LOG.info("Error getting UGI ", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+
+ String fileName =
+ this.store.addResourceReference(request.getResourceKey(),
+ new SharedCacheResourceReference(request.getAppId(),
+ callerUGI.getShortUserName()));
+
+ if (fileName != null) {
+ response
+ .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName));
+ this.metrics.incCacheHitCount();
+ } else {
+ this.metrics.incCacheMissCount();
+ }
+
+ return response;
+ }
+
+ @Override
+ public ReleaseSharedCacheResourceResponse release(
+ ReleaseSharedCacheResourceRequest request) throws YarnException,
+ IOException {
+
+ ReleaseSharedCacheResourceResponse response =
+ recordFactory
+ .newRecordInstance(ReleaseSharedCacheResourceResponse.class);
+
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ LOG.info("Error getting UGI ", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+
+ boolean removed =
+ this.store.removeResourceReference(
+ request.getResourceKey(),
+ new SharedCacheResourceReference(request.getAppId(), callerUGI
+ .getShortUserName()), true);
+
+ if (removed) {
+ this.metrics.incCacheRelease();
+ }
+
+ return response;
+ }
+
+ private String getCacheEntryFilePath(String checksum, String filename) {
+ return SharedCacheUtil.getCacheEntryPath(this.cacheDepth,
+ this.cacheRoot, checksum) + Path.SEPARATOR_CHAR + filename;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index ab50727..c54e470 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -71,6 +71,9 @@ public class SharedCacheManager extends CompositeService {
createNMCacheUploaderSCMProtocolService(store);
addService(nms);
+ ClientProtocolService cps = createClientProtocolService(store);
+ addService(cps);
+
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -106,6 +109,10 @@ public class SharedCacheManager extends CompositeService {
return new SharedCacheUploaderService(store);
}
+ private ClientProtocolService createClientProtocolService(SCMStore store) {
+ return new ClientProtocolService(store);
+ }
+
@Override
protected void serviceStop() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java
new file mode 100644
index 0000000..3ae88e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining client requests metrics
+ * and publishing them through the metrics interfaces.
+ */
+@Private
+@Unstable
+@Metrics(about="Client SCM metrics", context="yarn")
+public class ClientSCMMetrics {
+
+ private static final Log LOG = LogFactory.getLog(ClientSCMMetrics.class);
+ final MetricsRegistry registry;
+
+ ClientSCMMetrics() {
+ registry = new MetricsRegistry("clientRequests");
+ LOG.debug("Initialized " + registry);
+ }
+
+ enum Singleton {
+ INSTANCE;
+
+ ClientSCMMetrics impl;
+
+ synchronized ClientSCMMetrics init(Configuration conf) {
+ if (impl == null) {
+ impl = create();
+ }
+ return impl;
+ }
+ }
+
+ public static ClientSCMMetrics initSingleton(Configuration conf) {
+ return Singleton.INSTANCE.init(conf);
+ }
+
+ public static ClientSCMMetrics getInstance() {
+ ClientSCMMetrics topMetrics = Singleton.INSTANCE.impl;
+ if (topMetrics == null) {
+ throw new IllegalStateException(
+ "The ClientSCMMetrics singleton instance is not initialized."
+ + " Have you called init first?");
+ }
+ return topMetrics;
+ }
+
+ static ClientSCMMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+
+ ClientSCMMetrics metrics = new ClientSCMMetrics();
+ ms.register("clientRequests", null, metrics);
+ return metrics;
+ }
+
+ @Metric("Number of cache hits") MutableCounterLong cacheHits;
+ @Metric("Number of cache misses") MutableCounterLong cacheMisses;
+ @Metric("Number of cache releases") MutableCounterLong cacheReleases;
+
+ /**
+ * One cache hit event
+ */
+ public void incCacheHitCount() {
+ cacheHits.incr();
+ }
+
+ /**
+ * One cache miss event
+ */
+ public void incCacheMissCount() {
+ cacheMisses.incr();
+ }
+
+ /**
+ * One cache release event
+ */
+ public void incCacheRelease() {
+ cacheReleases.incr();
+ }
+
+ public long getCacheHits() { return cacheHits.value(); }
+ public long getCacheMisses() { return cacheMisses.value(); }
+ public long getCacheReleases() { return cacheReleases.value(); }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java
new file mode 100644
index 0000000..68f9851
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Basic unit tests for the Client to SCM Protocol Service.
+ */
+public class TestClientSCMProtocolService {
+ private static File testDir = null;
+
+ @BeforeClass
+ public static void setupTestDirs() throws IOException {
+ testDir = new File("target",
+ TestSharedCacheUploaderService.class.getCanonicalName());
+ testDir.delete();
+ testDir.mkdirs();
+ testDir = testDir.getAbsoluteFile();
+ }
+
+ @AfterClass
+ public static void cleanupTestDirs() throws IOException {
+ if (testDir != null) {
+ testDir.delete();
+ }
+ }
+
+
+ private ClientProtocolService service;
+ private ClientSCMProtocol clientSCMProxy;
+ private SCMStore store;
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ @Before
+ public void startUp() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.SCM_STORE_CLASS,
+ InMemorySCMStore.class.getName());
+ conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
+ AppChecker appChecker = mock(AppChecker.class);
+ store = new InMemorySCMStore(appChecker);
+ store.init(conf);
+ store.start();
+
+ service = new ClientProtocolService(store);
+ service.init(conf);
+ service.start();
+
+ YarnRPC rpc = YarnRPC.create(new Configuration());
+
+ InetSocketAddress scmAddress =
+ conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
+
+ clientSCMProxy =
+ (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, scmAddress,
+ conf);
+ }
+
+ @After
+ public void cleanUp() {
+ if (store != null) {
+ store.stop();
+ store = null;
+ }
+
+ if (service != null) {
+ service.stop();
+ service = null;
+ }
+
+ if (clientSCMProxy != null) {
+ RPC.stopProxy(clientSCMProxy);
+ clientSCMProxy = null;
+ }
+ }
+
+ @Test
+ public void testUse_MissingEntry() throws Exception {
+ long misses = ClientSCMMetrics.getInstance().getCacheMisses();
+ UseSharedCacheResourceRequest request =
+ recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+ request.setResourceKey("key1");
+ request.setAppId(createAppId(1, 1L));
+ assertNull(clientSCMProxy.use(request).getPath());
+ assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+ .getInstance().getCacheMisses() - misses);
+ }
+
+ @Test
+ public void testUse_ExistingEntry_NoAppIds() throws Exception {
+ // Pre-populate the SCM with one cache entry
+ store.addResource("key1", "foo.jar");
+
+ long hits = ClientSCMMetrics.getInstance().getCacheHits();
+
+ UseSharedCacheResourceRequest request =
+ recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+ request.setResourceKey("key1");
+ request.setAppId(createAppId(2, 2L));
+ // Expecting default depth of 3 and under the shared cache root dir
+ String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
+ assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
+ assertEquals(1, store.getResourceReferences("key1").size());
+ assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+ .getInstance().getCacheHits() - hits);
+
+ }
+
+ @Test
+ public void testUse_ExistingEntry_OneId() throws Exception {
+ // Pre-populate the SCM with one cache entry
+ store.addResource("key1", "foo.jar");
+ store.addResourceReference("key1",
+ new SharedCacheResourceReference(createAppId(1, 1L), "user"));
+ assertEquals(1, store.getResourceReferences("key1").size());
+ long hits = ClientSCMMetrics.getInstance().getCacheHits();
+
+ // Add a new distinct appId
+ UseSharedCacheResourceRequest request =
+ recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+ request.setResourceKey("key1");
+ request.setAppId(createAppId(2, 2L));
+
+ // Expecting default depth of 3 under the shared cache root dir
+ String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
+ assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
+ assertEquals(2, store.getResourceReferences("key1").size());
+ assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+ .getInstance().getCacheHits() - hits);
+ }
+
+ @Test
+ public void testUse_ExistingEntry_DupId() throws Exception {
+ // Pre-populate the SCM with one cache entry
+ store.addResource("key1", "foo.jar");
+ UserGroupInformation testUGI = UserGroupInformation.getCurrentUser();
+ store.addResourceReference("key1",
+ new SharedCacheResourceReference(createAppId(1, 1L),
+ testUGI.getShortUserName()));
+ assertEquals(1, store.getResourceReferences("key1").size());
+
+ long hits = ClientSCMMetrics.getInstance().getCacheHits();
+
+ // Add a new duplicate appId
+ UseSharedCacheResourceRequest request =
+ recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class);
+ request.setResourceKey("key1");
+ request.setAppId(createAppId(1, 1L));
+
+ // Expecting default depth of 3 under the shared cache root dir
+ String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar";
+ assertEquals(expectedPath, clientSCMProxy.use(request).getPath());
+ assertEquals(1, store.getResourceReferences("key1").size());
+
+ assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+ .getInstance().getCacheHits() - hits);
+ }
+
+ @Test
+ public void testRelease_ExistingEntry_NonExistantAppId() throws Exception {
+ // Pre-populate the SCM with one cache entry
+ store.addResource("key1", "foo.jar");
+ store.addResourceReference("key1",
+ new SharedCacheResourceReference(createAppId(1, 1L), "user"));
+ assertEquals(1, store.getResourceReferences("key1").size());
+
+ long releases = ClientSCMMetrics.getInstance().getCacheReleases();
+
+ ReleaseSharedCacheResourceRequest request =
+ recordFactory
+ .newRecordInstance(ReleaseSharedCacheResourceRequest.class);
+ request.setResourceKey("key1");
+ request.setAppId(createAppId(2, 2L));
+ clientSCMProxy.release(request);
+ assertEquals(1, store.getResourceReferences("key1").size());
+
+ assertEquals(
+ "Client SCM metrics were updated when a release did not happen", 0,
+ ClientSCMMetrics.getInstance().getCacheReleases() - releases);
+
+ }
+
+ @Test
+ public void testRelease_ExistingEntry_WithAppId() throws Exception {
+ // Pre-populate the SCM with one cache entry
+ store.addResource("key1", "foo.jar");
+ UserGroupInformation testUGI = UserGroupInformation.getCurrentUser();
+ store.addResourceReference("key1",
+ new SharedCacheResourceReference(createAppId(1, 1L),
+ testUGI.getShortUserName()));
+ assertEquals(1, store.getResourceReferences("key1").size());
+
+ long releases = ClientSCMMetrics.getInstance().getCacheReleases();
+
+ ReleaseSharedCacheResourceRequest request =
+ recordFactory
+ .newRecordInstance(ReleaseSharedCacheResourceRequest.class);
+ request.setResourceKey("key1");
+ request.setAppId(createAppId(1, 1L));
+ clientSCMProxy.release(request);
+ assertEquals(0, store.getResourceReferences("key1").size());
+
+ assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics
+ .getInstance().getCacheReleases() - releases);
+
+ }
+
+ @Test
+ public void testRelease_MissingEntry() throws Exception {
+
+ long releases = ClientSCMMetrics.getInstance().getCacheReleases();
+
+ ReleaseSharedCacheResourceRequest request =
+ recordFactory
+ .newRecordInstance(ReleaseSharedCacheResourceRequest.class);
+ request.setResourceKey("key2");
+ request.setAppId(createAppId(2, 2L));
+ clientSCMProxy.release(request);
+ assertNotNull(store.getResourceReferences("key2"));
+ assertEquals(0, store.getResourceReferences("key2").size());
+ assertEquals(
+ "Client SCM metrics were updated when a release did not happen.", 0,
+ ClientSCMMetrics.getInstance().getCacheReleases() - releases);
+ }
+
+ private ApplicationId createAppId(int id, long timestamp) {
+ return ApplicationId.newInstance(timestamp, id);
+ }
+}