You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2017/01/18 00:32:49 UTC
reef git commit: [REEF-1710] Create a unit test that checks the YARN
Unmanaged AM API
Repository: reef
Updated Branches:
refs/heads/master f78355489 -> c030e7366
[REEF-1710] Create a unit test that checks the YARN Unmanaged AM API
JIRA:
[REEF-1710](https://issues.apache.org/jira/browse/REEF-1710)
Pull request:
Closes #1224
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c030e736
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c030e736
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c030e736
Branch: refs/heads/master
Commit: c030e7366675bb44a735b7ffc5cf5fe5940c449d
Parents: f783554
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed Jan 11 18:17:34 2017 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Tue Jan 17 15:55:44 2017 -0800
----------------------------------------------------------------------
.../yarn/driver/unmanaged/UnmanagedAmTest.java | 220 +++++++++++++++++++
.../yarn/driver/unmanaged/package-info.java | 22 ++
2 files changed, 242 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/c030e736/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java
new file mode 100644
index 0000000..130f148
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.unmanaged;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Test REEF Driver in Unmanaged AM mode on YARN.
+ */
+public final class UnmanagedAmTest implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
+
+ private static final Logger LOG = Logger.getLogger(UnmanagedAmTest.class.getName());
+
+ @Test
+ public void testAmShutdown() throws IOException, YarnException {
+
+ Assume.assumeTrue(
+ "This test requires a YARN Resource Manager to connect to",
+ Boolean.parseBoolean(System.getenv("REEF_TEST_YARN")));
+
+ final YarnConfiguration yarnConfig = new YarnConfiguration();
+
+ // Start YARN client and register the application
+
+ final YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(yarnConfig);
+ yarnClient.start();
+
+ final ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class);
+ containerContext.setCommands(Collections.<String>emptyList());
+ containerContext.setLocalResources(Collections.<String, LocalResource>emptyMap());
+ containerContext.setEnvironment(Collections.<String, String>emptyMap());
+ containerContext.setTokens(getTokens());
+
+ final ApplicationSubmissionContext appContext = yarnClient.createApplication().getApplicationSubmissionContext();
+ appContext.setApplicationName("REEF_Unmanaged_AM_Test");
+ appContext.setAMContainerSpec(containerContext);
+ appContext.setUnmanagedAM(true);
+ appContext.setQueue("default");
+
+ final ApplicationId applicationId = appContext.getApplicationId();
+ LOG.log(Level.INFO, "Registered YARN application: {0}", applicationId);
+
+ yarnClient.submitApplication(appContext);
+
+ LOG.log(Level.INFO, "YARN application submitted: {0}", applicationId);
+
+ addToken(yarnClient.getAMRMToken(applicationId));
+
+ // Start the AM
+
+ final AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this);
+ rmClient.init(yarnConfig);
+ rmClient.start();
+
+ final NMClientAsync nmClient = new NMClientAsyncImpl(this);
+ nmClient.init(yarnConfig);
+ nmClient.start();
+
+ final RegisterApplicationMasterResponse registration =
+ rmClient.registerApplicationMaster(NetUtils.getHostname(), -1, null);
+
+ LOG.log(Level.INFO, "Unmanaged AM is running: {0}", registration);
+
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "Success!", null);
+
+ LOG.log(Level.INFO, "Unregistering AM: state {0}", rmClient.getServiceState());
+
+ // Shutdown the AM
+
+ rmClient.stop();
+ nmClient.stop();
+
+ // Get the final application report
+
+ final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+ final YarnApplicationState appState = appReport.getYarnApplicationState();
+ final FinalApplicationStatus finalAttemptStatus = appReport.getFinalApplicationStatus();
+
+ LOG.log(Level.INFO, "Application {0} final attempt {1} status: {2}/{3}", new Object[] {
+ applicationId, appReport.getCurrentApplicationAttemptId(), appState, finalAttemptStatus});
+
+ Assert.assertEquals("Application must be in FINISHED state", YarnApplicationState.FINISHED, appState);
+ Assert.assertEquals("Final status must be SUCCEEDED", FinalApplicationStatus.SUCCEEDED, finalAttemptStatus);
+
+ // Shutdown YARN client
+
+ yarnClient.stop();
+ }
+
+ private static ByteBuffer getTokens() throws IOException {
+
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ final Credentials credentials = ugi.getCredentials();
+
+ try (final DataOutputBuffer dob = new DataOutputBuffer()) {
+ credentials.writeTokenStorageToStream(dob);
+ return ByteBuffer.wrap(dob.getData());
+ }
+ }
+
+ private static void addToken(final Token<AMRMTokenIdentifier> token) throws IOException {
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ ugi.addToken(token);
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ LOG.log(Level.INFO, "Shutdown requested by YARN");
+ }
+
+ // Methods below are dummy implementations of the required callbacks.
+
+ @Override
+ public void onContainersCompleted(final List<ContainerStatus> list) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onContainersAllocated(final List<Container> list) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onNodesUpdated(final List<NodeReport> list) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ throw new RuntimeException("This method should never be invoked", throwable);
+ }
+
+ @Override
+ public void onContainerStarted(final ContainerId containerId, final Map<String, ByteBuffer> map) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onContainerStatusReceived(final ContainerId containerId, final ContainerStatus containerStatus) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onContainerStopped(final ContainerId containerId) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onStartContainerError(final ContainerId containerId, final Throwable throwable) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onGetContainerStatusError(final ContainerId containerId, final Throwable throwable) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+
+ @Override
+ public void onStopContainerError(final ContainerId containerId, final Throwable throwable) {
+ throw new RuntimeException("This method should never be invoked");
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c030e736/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java
new file mode 100644
index 0000000..048fecf
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Unit tests for running REEF Driver in YARN Unmanaged AM mode.
+ */
+package org.apache.reef.runtime.yarn.driver.unmanaged;