You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2017/01/18 00:32:49 UTC

reef git commit: [REEF-1710] Create a unit test that checks the YARN Unmanaged AM API

Repository: reef
Updated Branches:
  refs/heads/master f78355489 -> c030e7366


[REEF-1710] Create a unit test that checks the YARN Unmanaged AM API

JIRA:
   [REEF-1710](https://issues.apache.org/jira/browse/REEF-1710)

Pull request:
  Closes #1224


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c030e736
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c030e736
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c030e736

Branch: refs/heads/master
Commit: c030e7366675bb44a735b7ffc5cf5fe5940c449d
Parents: f783554
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed Jan 11 18:17:34 2017 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Tue Jan 17 15:55:44 2017 -0800

----------------------------------------------------------------------
 .../yarn/driver/unmanaged/UnmanagedAmTest.java  | 220 +++++++++++++++++++
 .../yarn/driver/unmanaged/package-info.java     |  22 ++
 2 files changed, 242 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/c030e736/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java
new file mode 100644
index 0000000..130f148
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.unmanaged;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Test REEF Driver in Unmanaged AM mode on YARN.
+ */
+public final class UnmanagedAmTest implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
+
+  private static final Logger LOG = Logger.getLogger(UnmanagedAmTest.class.getName());
+
+  @Test
+  public void testAmShutdown() throws IOException, YarnException {
+
+    Assume.assumeTrue(
+        "This test requires a YARN Resource Manager to connect to",
+        Boolean.parseBoolean(System.getenv("REEF_TEST_YARN")));
+
+    final YarnConfiguration yarnConfig = new YarnConfiguration();
+
+    // Start YARN client and register the application
+
+    final YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(yarnConfig);
+    yarnClient.start();
+
+    final ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class);
+    containerContext.setCommands(Collections.<String>emptyList());
+    containerContext.setLocalResources(Collections.<String, LocalResource>emptyMap());
+    containerContext.setEnvironment(Collections.<String, String>emptyMap());
+    containerContext.setTokens(getTokens());
+
+    final ApplicationSubmissionContext appContext = yarnClient.createApplication().getApplicationSubmissionContext();
+    appContext.setApplicationName("REEF_Unmanaged_AM_Test");
+    appContext.setAMContainerSpec(containerContext);
+    appContext.setUnmanagedAM(true);
+    appContext.setQueue("default");
+
+    final ApplicationId applicationId = appContext.getApplicationId();
+    LOG.log(Level.INFO, "Registered YARN application: {0}", applicationId);
+
+    yarnClient.submitApplication(appContext);
+
+    LOG.log(Level.INFO, "YARN application submitted: {0}", applicationId);
+
+    addToken(yarnClient.getAMRMToken(applicationId));
+
+    // Start the AM
+
+    final AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this);
+    rmClient.init(yarnConfig);
+    rmClient.start();
+
+    final NMClientAsync nmClient = new NMClientAsyncImpl(this);
+    nmClient.init(yarnConfig);
+    nmClient.start();
+
+    final RegisterApplicationMasterResponse registration =
+        rmClient.registerApplicationMaster(NetUtils.getHostname(), -1, null);
+
+    LOG.log(Level.INFO, "Unmanaged AM is running: {0}", registration);
+
+    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "Success!", null);
+
+    LOG.log(Level.INFO, "Unregistering AM: state {0}", rmClient.getServiceState());
+
+    // Shutdown the AM
+
+    rmClient.stop();
+    nmClient.stop();
+
+    // Get the final application report
+
+    final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+    final YarnApplicationState appState = appReport.getYarnApplicationState();
+    final FinalApplicationStatus finalAttemptStatus = appReport.getFinalApplicationStatus();
+
+    LOG.log(Level.INFO, "Application {0} final attempt {1} status: {2}/{3}", new Object[] {
+        applicationId, appReport.getCurrentApplicationAttemptId(), appState, finalAttemptStatus});
+
+    Assert.assertEquals("Application must be in FINISHED state", YarnApplicationState.FINISHED, appState);
+    Assert.assertEquals("Final status must be SUCCEEDED", FinalApplicationStatus.SUCCEEDED, finalAttemptStatus);
+
+    // Shutdown YARN client
+
+    yarnClient.stop();
+  }
+
+  private static ByteBuffer getTokens() throws IOException {
+
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    final Credentials credentials = ugi.getCredentials();
+
+    try (final DataOutputBuffer dob = new DataOutputBuffer()) {
+      credentials.writeTokenStorageToStream(dob);
+      return ByteBuffer.wrap(dob.getData());
+    }
+  }
+
+  private static void addToken(final Token<AMRMTokenIdentifier> token) throws IOException {
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    ugi.addToken(token);
+  }
+
+  @Override
+  public void onShutdownRequest() {
+    LOG.log(Level.INFO, "Shutdown requested by YARN");
+  }
+
+  // Methods below are dummy implementations of the required callbacks.
+
+  @Override
+  public void onContainersCompleted(final List<ContainerStatus> list) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onContainersAllocated(final List<Container> list) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onNodesUpdated(final List<NodeReport> list) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public float getProgress() {
+    return 0;
+  }
+
+  @Override
+  public void onError(final Throwable throwable) {
+    throw new RuntimeException("This method should never be invoked", throwable);
+  }
+
+  @Override
+  public void onContainerStarted(final ContainerId containerId, final Map<String, ByteBuffer> map) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onContainerStatusReceived(final ContainerId containerId, final ContainerStatus containerStatus) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onContainerStopped(final ContainerId containerId) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onStartContainerError(final ContainerId containerId, final Throwable throwable) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onGetContainerStatusError(final ContainerId containerId, final Throwable throwable) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+
+  @Override
+  public void onStopContainerError(final ContainerId containerId, final Throwable throwable) {
+    throw new RuntimeException("This method should never be invoked");
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c030e736/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java
new file mode 100644
index 0000000..048fecf
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Unit tests for running REEF Driver in YARN Unmanaged AM mode.
+ */
+package org.apache.reef.runtime.yarn.driver.unmanaged;