You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/16 19:31:24 UTC
[11/25] hadoop git commit: YARN-2217. [YARN-1492] Shared cache client
side changes. (Chris Trezzo via kasha)
YARN-2217. [YARN-1492] Shared cache client side changes. (Chris Trezzo via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba5116ec
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba5116ec
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba5116ec
Branch: refs/heads/YARN-2928
Commit: ba5116ec8e0c075096c6f84a8c8a1c6ce8297cf2
Parents: 5805dc0
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Jan 15 11:13:47 2015 +0530
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Jan 15 14:28:44 2015 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../yarn/client/api/SharedCacheClient.java | 108 ++++++++++++
.../client/api/impl/SharedCacheClientImpl.java | 166 ++++++++++++++++++
.../api/impl/TestSharedCacheClientImpl.java | 170 +++++++++++++++++++
4 files changed, 447 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a398347..5716b50 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -75,6 +75,9 @@ Release 2.7.0 - UNRELEASED
YARN-2427. Added the API of moving apps between queues in RM web services.
(Varun Vasudev via zjshen)
+ YARN-2217. [YARN-1492] Shared cache client side changes.
+ (Chris Trezzo via kasha)
+
IMPROVEMENTS
YARN-2950. Change message to mandate, not suggest JS requirement on UI.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java
new file mode 100644
index 0000000..7cbe0e1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This is the client for YARN's shared cache.
+ */
+@Public
+@Unstable
+public abstract class SharedCacheClient extends AbstractService {
+
+ @Public
+ public static SharedCacheClient createSharedCacheClient() {
+ SharedCacheClient client = new SharedCacheClientImpl();
+ return client;
+ }
+
+ @Private
+ public SharedCacheClient(String name) {
+ super(name);
+ }
+
+ /**
+ * <p>
+ * The method to claim a resource with the <code>SharedCacheManager.</code>
+ * The client uses a checksum to identify the resource and an
+ * {@link ApplicationId} to identify which application will be using the
+ * resource.
+ * </p>
+ *
+ * <p>
+ * The <code>SharedCacheManager</code> responds with whether or not the
+ * resource exists in the cache. If the resource exists, a <code>Path</code>
+ * to the resource in the shared cache is returned. If the resource does not
+ * exist, null is returned instead.
+ * </p>
+ *
+ * @param applicationId ApplicationId of the application using the resource
+ * @param resourceKey the key (i.e. checksum) that identifies the resource
+ * @return Path to the resource, or null if it does not exist
+ */
+ @Public
+ @Unstable
+ public abstract Path use(ApplicationId applicationId, String resourceKey)
+ throws YarnException;
+
+ /**
+ * <p>
+ * The method to release a resource with the <code>SharedCacheManager.</code>
+ * This method is called once an application is no longer using a claimed
+ * resource in the shared cache. The client uses a checksum to identify the
+ * resource and an {@link ApplicationId} to identify which application is
+ * releasing the resource.
+ * </p>
+ *
+ * <p>
+ * Note: This method is an optimization and the client is not required to call
+ * it for correctness.
+ * </p>
+ *
+ * @param applicationId ApplicationId of the application releasing the
+ * resource
+ * @param resourceKey the key (i.e. checksum) that identifies the resource
+ */
+ @Public
+ @Unstable
+ public abstract void release(ApplicationId applicationId, String resourceKey)
+ throws YarnException;
+
+ /**
+ * A convenience method to calculate the checksum of a specified file.
+ *
+ * @param sourceFile A path to the input file
+ * @return A hex string containing the checksum digest
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ public abstract String getFileChecksum(Path sourceFile) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java
new file mode 100644
index 0000000..0a61ee0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.SharedCacheClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * An implementation of the SharedCacheClient API.
+ */
+@Private
+@Unstable
+public class SharedCacheClientImpl extends SharedCacheClient {
+ private static final Log LOG = LogFactory
+ .getLog(SharedCacheClientImpl.class);
+
+ private ClientSCMProtocol scmClient;
+ private InetSocketAddress scmAddress;
+ private Configuration conf;
+ private SharedCacheChecksum checksum;
+
+ public SharedCacheClientImpl() {
+ super(SharedCacheClientImpl.class.getName());
+ }
+
+ private static InetSocketAddress getScmAddress(Configuration conf) {
+ return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (this.scmAddress == null) {
+ this.scmAddress = getScmAddress(conf);
+ }
+ this.conf = conf;
+ this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ this.scmClient = createClientProxy();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to Shared Cache Manager at " + this.scmAddress);
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopClientProxy();
+ super.serviceStop();
+ }
+
+ @VisibleForTesting
+ protected ClientSCMProtocol createClientProxy() {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ return (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class,
+ this.scmAddress, getConfig());
+ }
+
+ @VisibleForTesting
+ protected void stopClientProxy() {
+ if (this.scmClient != null) {
+ RPC.stopProxy(this.scmClient);
+ this.scmClient = null;
+ }
+ }
+
+ @Override
+ public Path use(ApplicationId applicationId, String resourceKey)
+ throws YarnException {
+ Path resourcePath = null;
+ UseSharedCacheResourceRequest request = Records.newRecord(
+ UseSharedCacheResourceRequest.class);
+ request.setAppId(applicationId);
+ request.setResourceKey(resourceKey);
+ try {
+ UseSharedCacheResourceResponse response = this.scmClient.use(request);
+ if (response != null && response.getPath() != null) {
+ resourcePath = new Path(response.getPath());
+ }
+ } catch (Exception e) {
+ // Just catching IOException isn't enough.
+ // RPC call can throw ConnectionException.
+ // We don't handle different exceptions separately at this point.
+ throw new YarnException(e);
+ }
+ return resourcePath;
+ }
+
+ @Override
+ public void release(ApplicationId applicationId, String resourceKey)
+ throws YarnException {
+ ReleaseSharedCacheResourceRequest request = Records.newRecord(
+ ReleaseSharedCacheResourceRequest.class);
+ request.setAppId(applicationId);
+ request.setResourceKey(resourceKey);
+ try {
+ // We do not care about the response because it is empty.
+ this.scmClient.release(request);
+ } catch (Exception e) {
+ // Just catching IOException isn't enough.
+ // RPC call can throw ConnectionException.
+ throw new YarnException(e);
+ }
+ }
+
+ @Override
+ public String getFileChecksum(Path sourceFile)
+ throws IOException {
+ FileSystem fs = sourceFile.getFileSystem(this.conf);
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(sourceFile);
+ return this.checksum.computeChecksum(in);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java
new file mode 100644
index 0000000..3985e54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSharedCacheClientImpl {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestSharedCacheClientImpl.class);
+
+ public static SharedCacheClientImpl client;
+ public static ClientSCMProtocol cProtocol;
+ private static Path TEST_ROOT_DIR;
+ private static FileSystem localFs;
+ private static String input = "This is a test file.";
+ private static String inputChecksumSHA256 =
+ "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ localFs = FileSystem.getLocal(new Configuration());
+ TEST_ROOT_DIR =
+ new Path("target", TestSharedCacheClientImpl.class.getName()
+ + "-tmpDir").makeQualified(localFs.getUri(),
+ localFs.getWorkingDirectory());
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ try {
+ if (localFs != null) {
+ localFs.close();
+ }
+ } catch (IOException ioe) {
+ LOG.info("IO exception in closing file system)");
+ ioe.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setup() {
+ cProtocol = mock(ClientSCMProtocol.class);
+ client = new SharedCacheClientImpl() {
+ @Override
+ protected ClientSCMProtocol createClientProxy() {
+ return cProtocol;
+ }
+
+ @Override
+ protected void stopClientProxy() {
+ // do nothing because it is mocked
+ }
+ };
+ client.init(new Configuration());
+ client.start();
+ }
+
+ @After
+ public void cleanup() {
+ if (client != null) {
+ client.stop();
+ client = null;
+ }
+ }
+
+ @Test
+ public void testUse() throws Exception {
+ Path file = new Path("viewfs://test/path");
+ UseSharedCacheResourceResponse response =
+ new UseSharedCacheResourceResponsePBImpl();
+ response.setPath(file.toString());
+ when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
+ response);
+ Path newPath = client.use(mock(ApplicationId.class), "key");
+ assertEquals(file, newPath);
+ }
+
+ @Test(expected = YarnException.class)
+ public void testUseError() throws Exception {
+ String message = "Mock IOExcepiton!";
+ when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow(
+ new IOException(message));
+ client.use(mock(ApplicationId.class), "key");
+ }
+
+ @Test
+ public void testRelease() throws Exception {
+ // Release does not care about the return value because it is empty
+ when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class)))
+ .thenReturn(null);
+ client.release(mock(ApplicationId.class), "key");
+ }
+
+ @Test(expected = YarnException.class)
+ public void testReleaseError() throws Exception {
+ String message = "Mock IOExcepiton!";
+ when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class)))
+ .thenThrow(new IOException(message));
+ client.release(mock(ApplicationId.class), "key");
+ }
+
+ @Test
+ public void testChecksum() throws Exception {
+ String filename = "test1.txt";
+ Path file = makeFile(filename);
+ assertEquals(inputChecksumSHA256, client.getFileChecksum(file));
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testNonexistantFileChecksum() throws Exception {
+ Path file = new Path(TEST_ROOT_DIR, "non-existant-file");
+ client.getFileChecksum(file);
+ }
+
+ private Path makeFile(String filename) throws Exception {
+ Path file = new Path(TEST_ROOT_DIR, filename);
+ DataOutputStream out = null;
+ try {
+ out = localFs.create(file);
+ out.write(input.getBytes("UTF-8"));
+ } finally {
+ if(out != null) {
+ out.close();
+ }
+ }
+ return file;
+ }
+}