You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/16 19:31:24 UTC

[11/25] hadoop git commit: YARN-2217. [YARN-1492] Shared cache client side changes. (Chris Trezzo via kasha)

YARN-2217. [YARN-1492] Shared cache client side changes. (Chris Trezzo via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba5116ec
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba5116ec
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba5116ec

Branch: refs/heads/YARN-2928
Commit: ba5116ec8e0c075096c6f84a8c8a1c6ce8297cf2
Parents: 5805dc0
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Jan 15 11:13:47 2015 +0530
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Jan 15 14:28:44 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../yarn/client/api/SharedCacheClient.java      | 108 ++++++++++++
 .../client/api/impl/SharedCacheClientImpl.java  | 166 ++++++++++++++++++
 .../api/impl/TestSharedCacheClientImpl.java     | 170 +++++++++++++++++++
 4 files changed, 447 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a398347..5716b50 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -75,6 +75,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2427. Added the API of moving apps between queues in RM web services.
     (Varun Vasudev via zjshen)
 
+    YARN-2217. [YARN-1492] Shared cache client side changes. 
+    (Chris Trezzo via kasha)
+
   IMPROVEMENTS
 
     YARN-2950. Change message to mandate, not suggest JS requirement on UI.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java
new file mode 100644
index 0000000..7cbe0e1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/SharedCacheClient.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.impl.SharedCacheClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This is the client for YARN's shared cache.
+ */
+@Public
+@Unstable
+public abstract class SharedCacheClient extends AbstractService {
+
+  @Public
+  public static SharedCacheClient createSharedCacheClient() {
+    SharedCacheClient client = new SharedCacheClientImpl();
+    return client;
+  }
+
+  @Private
+  public SharedCacheClient(String name) {
+    super(name);
+  }
+
+  /**
+   * <p>
+   * The method to claim a resource with the <code>SharedCacheManager.</code>
+   * The client uses a checksum to identify the resource and an
+   * {@link ApplicationId} to identify which application will be using the
+   * resource.
+   * </p>
+   * 
+   * <p>
+   * The <code>SharedCacheManager</code> responds with whether or not the
+   * resource exists in the cache. If the resource exists, a <code>Path</code>
+   * to the resource in the shared cache is returned. If the resource does not
+   * exist, null is returned instead.
+   * </p>
+   * 
+   * @param applicationId ApplicationId of the application using the resource
+   * @param resourceKey the key (i.e. checksum) that identifies the resource
+   * @return Path to the resource, or null if it does not exist
+   */
+  @Public
+  @Unstable
+  public abstract Path use(ApplicationId applicationId, String resourceKey)
+      throws YarnException;
+
+  /**
+   * <p>
+   * The method to release a resource with the <code>SharedCacheManager.</code>
+   * This method is called once an application is no longer using a claimed
+   * resource in the shared cache. The client uses a checksum to identify the
+   * resource and an {@link ApplicationId} to identify which application is
+   * releasing the resource.
+   * </p>
+   * 
+   * <p>
+   * Note: This method is an optimization and the client is not required to call
+   * it for correctness.
+   * </p>
+   * 
+   * @param applicationId ApplicationId of the application releasing the
+   *          resource
+   * @param resourceKey the key (i.e. checksum) that identifies the resource
+   */
+  @Public
+  @Unstable
+  public abstract void release(ApplicationId applicationId, String resourceKey)
+      throws YarnException;
+
+  /**
+   * A convenience method to calculate the checksum of a specified file.
+   * 
+   * @param sourceFile A path to the input file
+   * @return A hex string containing the checksum digest
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  public abstract String getFileChecksum(Path sourceFile) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java
new file mode 100644
index 0000000..0a61ee0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/SharedCacheClientImpl.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.SharedCacheClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
+import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * An implementation of the SharedCacheClient API.
+ */
+@Private
+@Unstable
+public class SharedCacheClientImpl extends SharedCacheClient {
+  private static final Log LOG = LogFactory
+      .getLog(SharedCacheClientImpl.class);
+
+  private ClientSCMProtocol scmClient;
+  private InetSocketAddress scmAddress;
+  private Configuration conf;
+  private SharedCacheChecksum checksum;
+
+  public SharedCacheClientImpl() {
+    super(SharedCacheClientImpl.class.getName());
+  }
+
+  private static InetSocketAddress getScmAddress(Configuration conf) {
+    return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (this.scmAddress == null) {
+      this.scmAddress = getScmAddress(conf);
+    }
+    this.conf = conf;
+    this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    this.scmClient = createClientProxy();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to Shared Cache Manager at " + this.scmAddress);
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopClientProxy();
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  protected ClientSCMProtocol createClientProxy() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    return (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class,
+        this.scmAddress, getConfig());
+  }
+
+  @VisibleForTesting
+  protected void stopClientProxy() {
+    if (this.scmClient != null) {
+      RPC.stopProxy(this.scmClient);
+      this.scmClient = null;
+    }
+  }
+
+  @Override
+  public Path use(ApplicationId applicationId, String resourceKey)
+      throws YarnException {
+    Path resourcePath = null;
+    UseSharedCacheResourceRequest request = Records.newRecord(
+        UseSharedCacheResourceRequest.class);
+    request.setAppId(applicationId);
+    request.setResourceKey(resourceKey);
+    try {
+      UseSharedCacheResourceResponse response = this.scmClient.use(request);
+      if (response != null && response.getPath() != null) {
+        resourcePath = new Path(response.getPath());
+      }
+    } catch (Exception e) {
+      // Just catching IOException isn't enough.
+      // RPC call can throw ConnectionException.
+      // We don't handle different exceptions separately at this point.
+      throw new YarnException(e);
+    }
+    return resourcePath;
+  }
+
+  @Override
+  public void release(ApplicationId applicationId, String resourceKey)
+      throws YarnException {
+    ReleaseSharedCacheResourceRequest request = Records.newRecord(
+        ReleaseSharedCacheResourceRequest.class);
+    request.setAppId(applicationId);
+    request.setResourceKey(resourceKey);
+    try {
+      // We do not care about the response because it is empty.
+      this.scmClient.release(request);
+    } catch (Exception e) {
+      // Just catching IOException isn't enough.
+      // RPC call can throw ConnectionException.
+      throw new YarnException(e);
+    }
+  }
+
+  @Override
+  public String getFileChecksum(Path sourceFile)
+      throws IOException {
+    FileSystem fs = sourceFile.getFileSystem(this.conf);
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(sourceFile);
+      return this.checksum.computeChecksum(in);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba5116ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java
new file mode 100644
index 0000000..3985e54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestSharedCacheClientImpl.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSharedCacheClientImpl {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestSharedCacheClientImpl.class);
+
+  public static SharedCacheClientImpl client;
+  public static ClientSCMProtocol cProtocol;
+  private static Path TEST_ROOT_DIR;
+  private static FileSystem localFs;
+  private static String input = "This is a test file.";
+  private static String inputChecksumSHA256 =
+      "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    localFs = FileSystem.getLocal(new Configuration());
+    TEST_ROOT_DIR =
+        new Path("target", TestSharedCacheClientImpl.class.getName()
+            + "-tmpDir").makeQualified(localFs.getUri(),
+            localFs.getWorkingDirectory());
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    try {
+      if (localFs != null) {
+        localFs.close();
+      }
+    } catch (IOException ioe) {
+      LOG.info("IO exception in closing file system)");
+      ioe.printStackTrace();
+    }
+  }
+
+  @Before
+  public void setup() {
+    cProtocol = mock(ClientSCMProtocol.class);
+    client = new SharedCacheClientImpl() {
+      @Override
+      protected ClientSCMProtocol createClientProxy() {
+        return cProtocol;
+      }
+
+      @Override
+      protected void stopClientProxy() {
+        // do nothing because it is mocked
+      }
+    };
+    client.init(new Configuration());
+    client.start();
+  }
+
+  @After
+  public void cleanup() {
+    if (client != null) {
+      client.stop();
+      client = null;
+    }
+  }
+
+  @Test
+  public void testUse() throws Exception {
+    Path file = new Path("viewfs://test/path");
+    UseSharedCacheResourceResponse response =
+        new UseSharedCacheResourceResponsePBImpl();
+    response.setPath(file.toString());
+    when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn(
+        response);
+    Path newPath = client.use(mock(ApplicationId.class), "key");
+    assertEquals(file, newPath);
+  }
+
+  @Test(expected = YarnException.class)
+  public void testUseError() throws Exception {
+    String message = "Mock IOExcepiton!";
+    when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow(
+        new IOException(message));
+    client.use(mock(ApplicationId.class), "key");
+  }
+
+  @Test
+  public void testRelease() throws Exception {
+    // Release does not care about the return value because it is empty
+    when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class)))
+        .thenReturn(null);
+    client.release(mock(ApplicationId.class), "key");
+  }
+
+  @Test(expected = YarnException.class)
+  public void testReleaseError() throws Exception {
+    String message = "Mock IOExcepiton!";
+    when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class)))
+        .thenThrow(new IOException(message));
+    client.release(mock(ApplicationId.class), "key");
+  }
+
+  @Test
+  public void testChecksum() throws Exception {
+    String filename = "test1.txt";
+    Path file = makeFile(filename);
+    assertEquals(inputChecksumSHA256, client.getFileChecksum(file));
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testNonexistantFileChecksum() throws Exception {
+    Path file = new Path(TEST_ROOT_DIR, "non-existant-file");
+    client.getFileChecksum(file);
+  }
+
+  private Path makeFile(String filename) throws Exception {
+    Path file = new Path(TEST_ROOT_DIR, filename);
+    DataOutputStream out = null;
+    try {
+      out = localFs.create(file);
+      out.write(input.getBytes("UTF-8"));
+    } finally {
+      if(out != null) {
+        out.close();
+      }
+    }
+    return file;
+  }
+}