You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/18 06:02:48 UTC
svn commit: r1494017 [3/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yarn-appl...
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import static org.mockito.Matchers.anyFloat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestAMRMClientAsync {
+
+ private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testAMRMClientAsync() throws Exception {
+ Configuration conf = new Configuration();
+ final AtomicBoolean heartbeatBlock = new AtomicBoolean(true);
+ List<ContainerStatus> completed1 = Arrays.asList(
+ ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ List<Container> allocated1 = Arrays.asList(
+ Container.newInstance(null, null, null, null, null, null));
+ final AllocateResponse response1 = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), allocated1, null);
+ final AllocateResponse response2 = createAllocateResponse(completed1,
+ new ArrayList<Container>(), null);
+ final AllocateResponse emptyResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+ final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
+ when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
+ @Override
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Throwable {
+ secondHeartbeatSync.incrementAndGet();
+ while(heartbeatBlock.get()) {
+ synchronized(heartbeatBlock) {
+ heartbeatBlock.wait();
+ }
+ }
+ secondHeartbeatSync.incrementAndGet();
+ return response2;
+ }
+ }).thenReturn(emptyResponse);
+ when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
+ .thenReturn(null);
+ when(client.getClusterAvailableResources()).thenAnswer(new Answer<Resource>() {
+ @Override
+ public Resource answer(InvocationOnMock invocation)
+ throws Throwable {
+ // take client lock to simulate behavior of real impl
+ synchronized (client) {
+ Thread.sleep(10);
+ }
+ return null;
+ }
+ });
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+
+ // while the CallbackHandler will still only be processing the first response,
+ // heartbeater thread should still be sending heartbeats.
+ // To test this, wait for the second heartbeat to be received.
+ while (secondHeartbeatSync.get() < 1) {
+ Thread.sleep(10);
+ }
+
+ // heartbeat will be blocked. make sure we can call client methods at this
+ // time. Checks that heartbeat is not holding onto client lock
+ assert(secondHeartbeatSync.get() < 2);
+ asyncClient.getClusterAvailableResources();
+ // method returned. now unblock heartbeat
+ assert(secondHeartbeatSync.get() < 2);
+ synchronized (heartbeatBlock) {
+ heartbeatBlock.set(false);
+ heartbeatBlock.notifyAll();
+ }
+
+ // allocated containers should come before completed containers
+ Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+
+ // wait for the allocated containers from the first heartbeat's response
+ while (callbackHandler.takeAllocatedContainers() == null) {
+ Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+ Thread.sleep(10);
+ }
+
+ // wait for the completed containers from the second heartbeat's response
+ while (callbackHandler.takeCompletedContainers() == null) {
+ Thread.sleep(10);
+ }
+
+ asyncClient.stop();
+
+ Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
+ Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+ }
+
+ @Test(timeout=10000)
+ public void testAMRMClientAsyncException() throws Exception {
+ Configuration conf = new Configuration();
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+ String exStr = "TestException";
+ YarnException mockException = mock(YarnException.class);
+ when(mockException.getMessage()).thenReturn(exStr);
+ when(client.allocate(anyFloat())).thenThrow(mockException);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ synchronized (callbackHandler.notifier) {
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ while(callbackHandler.savedException == null) {
+ try {
+ callbackHandler.notifier.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
+
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
+ }
+
+ @Test//(timeout=10000)
+ public void testAMRMClientAsyncReboot() throws Exception {
+ Configuration conf = new Configuration();
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ final AllocateResponse rebootResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+ rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
+ when(client.allocate(anyFloat())).thenReturn(rebootResponse);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ synchronized (callbackHandler.notifier) {
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ while(callbackHandler.reboot == false) {
+ try {
+ callbackHandler.notifier.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
+ }
+
+ private AllocateResponse createAllocateResponse(
+ List<ContainerStatus> completed, List<Container> allocated,
+ List<NMToken> nmTokens) {
+ AllocateResponse response =
+ AllocateResponse.newInstance(0, completed, allocated,
+ new ArrayList<NodeReport>(), null, null, 1, null, nmTokens);
+ return response;
+ }
+
+ public static ContainerId newContainerId(int appId, int appAttemptId,
+ long timestamp, int containerId) {
+ ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, appAttemptId);
+ return ContainerId.newInstance(applicationAttemptId, containerId);
+ }
+
+ private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ private volatile List<ContainerStatus> completedContainers;
+ private volatile List<Container> allocatedContainers;
+ Exception savedException = null;
+ boolean reboot = false;
+ Object notifier = new Object();
+
+ int callbackCount = 0;
+
+ public List<ContainerStatus> takeCompletedContainers() {
+ List<ContainerStatus> ret = completedContainers;
+ if (ret == null) {
+ return null;
+ }
+ completedContainers = null;
+ synchronized (ret) {
+ ret.notify();
+ }
+ return ret;
+ }
+
+ public List<Container> takeAllocatedContainers() {
+ List<Container> ret = allocatedContainers;
+ if (ret == null) {
+ return null;
+ }
+ allocatedContainers = null;
+ synchronized (ret) {
+ ret.notify();
+ }
+ return ret;
+ }
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ completedContainers = statuses;
+ // wait for containers to be taken before returning
+ synchronized (completedContainers) {
+ while (completedContainers != null) {
+ try {
+ completedContainers.wait();
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted during wait", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ allocatedContainers = containers;
+ // wait for containers to be taken before returning
+ synchronized (allocatedContainers) {
+ while (allocatedContainers != null) {
+ try {
+ allocatedContainers.wait();
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted during wait", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ reboot = true;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+ @Override
+ public float getProgress() {
+ callbackCount++;
+ return 0.5f;
+ }
+
+ @Override
+ public void onError(Exception e) {
+ savedException = e;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.junit.After;
+import org.junit.Test;
+
+
+public class TestNMClientAsync {
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private NMClientAsyncImpl asyncClient;
+ private NodeId nodeId;
+ private Token containerToken;
+
+ @After
+ public void teardown() {
+ ServiceOperations.stop(asyncClient);
+ }
+
+ @Test (timeout = 10000)
+ public void testNMClientAsync() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
+
+ // Threads to run are more than the max size of the thread pool
+ int expectedSuccess = 40;
+ int expectedFailure = 40;
+
+ asyncClient = new MockNMClientAsync1(expectedSuccess, expectedFailure);
+ asyncClient.init(conf);
+ Assert.assertEquals("The max thread pool size is not correctly set",
+ 10, asyncClient.maxThreadPoolSize);
+ asyncClient.start();
+
+
+ for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
+ if (i == expectedSuccess) {
+ while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .isAllSuccessCallsExecuted()) {
+ Thread.sleep(10);
+ }
+ asyncClient.setClient(mockNMClient(1));
+ }
+ Container container = mockContainer(i);
+ ContainerLaunchContext clc =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ asyncClient.startContainerAsync(container, clc);
+ }
+ while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .isStartAndQueryFailureCallsExecuted()) {
+ Thread.sleep(10);
+ }
+ asyncClient.setClient(mockNMClient(2));
+ ((TestCallbackHandler1) asyncClient.getCallbackHandler()).path = false;
+ for (int i = 0; i < expectedFailure; ++i) {
+ Container container = mockContainer(
+ expectedSuccess + expectedFailure + i);
+ ContainerLaunchContext clc =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ asyncClient.startContainerAsync(container, clc);
+ }
+ while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .isStopFailureCallsExecuted()) {
+ Thread.sleep(10);
+ }
+ for (String errorMsg :
+ ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .errorMsgs) {
+ System.out.println(errorMsg);
+ }
+ Assert.assertEquals("Error occurs in CallbackHandler", 0,
+ ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .errorMsgs.size());
+ for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
+ System.out.println(errorMsg);
+ }
+ Assert.assertEquals("Error occurs in ContainerEventProcessor", 0,
+ ((MockNMClientAsync1) asyncClient).errorMsgs.size());
+ // When the callback functions are all executed, the event processor threads
+ // may still not terminate and the containers may still not removed.
+ while (asyncClient.containers.size() > 0) {
+ Thread.sleep(10);
+ }
+ asyncClient.stop();
+ Assert.assertFalse(
+ "The thread of Container Management Event Dispatcher is still alive",
+ asyncClient.eventDispatcherThread.isAlive());
+ Assert.assertTrue("The thread pool is not shut down",
+ asyncClient.threadPool.isShutdown());
+ }
+
+ private class MockNMClientAsync1 extends NMClientAsyncImpl {
+ private Set<String> errorMsgs =
+ Collections.synchronizedSet(new HashSet<String>());
+
+ protected MockNMClientAsync1(int expectedSuccess, int expectedFailure)
+ throws YarnException, IOException {
+ super(MockNMClientAsync1.class.getName(), mockNMClient(0),
+ new TestCallbackHandler1(expectedSuccess, expectedFailure));
+ }
+
+ private class MockContainerEventProcessor extends ContainerEventProcessor {
+ public MockContainerEventProcessor(ContainerEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } catch (RuntimeException e) {
+ // If the unexpected throwable comes from error callback functions, it
+ // will break ContainerEventProcessor.run(). Therefore, monitor
+ // the exception here
+ errorMsgs.add("Unexpected throwable from callback functions should" +
+ " be ignored by Container " + event.getContainerId());
+ }
+ }
+ }
+
+ @Override
+ protected ContainerEventProcessor getContainerEventProcessor(
+ ContainerEvent event) {
+ return new MockContainerEventProcessor(event);
+ }
+ }
+
+ private class TestCallbackHandler1
+ implements NMClientAsync.CallbackHandler {
+
+ private boolean path = true;
+
+ private int expectedSuccess;
+ private int expectedFailure;
+
+ private AtomicInteger actualStartSuccess = new AtomicInteger(0);
+ private AtomicInteger actualStartFailure = new AtomicInteger(0);
+ private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
+ private AtomicInteger actualQueryFailure = new AtomicInteger(0);
+ private AtomicInteger actualStopSuccess = new AtomicInteger(0);
+ private AtomicInteger actualStopFailure = new AtomicInteger(0);
+
+ private AtomicIntegerArray actualStartSuccessArray;
+ private AtomicIntegerArray actualStartFailureArray;
+ private AtomicIntegerArray actualQuerySuccessArray;
+ private AtomicIntegerArray actualQueryFailureArray;
+ private AtomicIntegerArray actualStopSuccessArray;
+ private AtomicIntegerArray actualStopFailureArray;
+
+ private Set<String> errorMsgs =
+ Collections.synchronizedSet(new HashSet<String>());
+
+ public TestCallbackHandler1(int expectedSuccess, int expectedFailure) {
+ this.expectedSuccess = expectedSuccess;
+ this.expectedFailure = expectedFailure;
+
+ actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
+ actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
+ actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
+ actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
+ actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
+ actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ if (path) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerStarted");
+ return;
+ }
+ actualStartSuccess.addAndGet(1);
+ actualStartSuccessArray.set(containerId.getId(), 1);
+
+ // move on to the following success tests
+ asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+ } else {
+ // move on to the following failure tests
+ asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+ }
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerStatusReceived");
+ return;
+ }
+ actualQuerySuccess.addAndGet(1);
+ actualQuerySuccessArray.set(containerId.getId(), 1);
+ // move on to the following success tests
+ asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerStopped");
+ return;
+ }
+ actualStopSuccess.addAndGet(1);
+ actualStopSuccessArray.set(containerId.getId(), 1);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ // If the unexpected throwable comes from success callback functions, it
+ // will be handled by the error callback functions. Therefore, monitor
+ // the exception here
+ if (t instanceof RuntimeException) {
+ errorMsgs.add("Unexpected throwable from callback functions should be" +
+ " ignored by Container " + containerId);
+ }
+ if (containerId.getId() < expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onStartContainerError");
+ return;
+ }
+ actualStartFailure.addAndGet(1);
+ actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
+ // move on to the following failure tests
+ asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ if (t instanceof RuntimeException) {
+ errorMsgs.add("Unexpected throwable from callback functions should be" +
+ " ignored by Container " + containerId);
+ }
+ if (containerId.getId() < expectedSuccess + expectedFailure) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onStopContainerError");
+ return;
+ }
+
+ actualStopFailure.addAndGet(1);
+ actualStopFailureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId,
+ Throwable t) {
+ if (t instanceof RuntimeException) {
+ errorMsgs.add("Unexpected throwable from callback functions should be"
+ + " ignored by Container " + containerId);
+ }
+ if (containerId.getId() < expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onGetContainerStatusError");
+ return;
+ }
+ actualQueryFailure.addAndGet(1);
+ actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ public boolean isAllSuccessCallsExecuted() {
+ boolean isAllSuccessCallsExecuted =
+ actualStartSuccess.get() == expectedSuccess &&
+ actualQuerySuccess.get() == expectedSuccess &&
+ actualStopSuccess.get() == expectedSuccess;
+ if (isAllSuccessCallsExecuted) {
+ assertAtomicIntegerArray(actualStartSuccessArray);
+ assertAtomicIntegerArray(actualQuerySuccessArray);
+ assertAtomicIntegerArray(actualStopSuccessArray);
+ }
+ return isAllSuccessCallsExecuted;
+ }
+
+ public boolean isStartAndQueryFailureCallsExecuted() {
+ boolean isStartAndQueryFailureCallsExecuted =
+ actualStartFailure.get() == expectedFailure &&
+ actualQueryFailure.get() == expectedFailure;
+ if (isStartAndQueryFailureCallsExecuted) {
+ assertAtomicIntegerArray(actualStartFailureArray);
+ assertAtomicIntegerArray(actualQueryFailureArray);
+ }
+ return isStartAndQueryFailureCallsExecuted;
+ }
+
+ public boolean isStopFailureCallsExecuted() {
+ boolean isStopFailureCallsExecuted =
+ actualStopFailure.get() == expectedFailure;
+ if (isStopFailureCallsExecuted) {
+ assertAtomicIntegerArray(actualStopFailureArray);
+ }
+ return isStopFailureCallsExecuted;
+ }
+
+ private void assertAtomicIntegerArray(AtomicIntegerArray array) {
+ for (int i = 0; i < array.length(); ++i) {
+ Assert.assertEquals(1, array.get(i));
+ }
+ }
+ }
+
+ private NMClient mockNMClient(int mode)
+ throws YarnException, IOException {
+ NMClient client = mock(NMClient.class);
+ switch (mode) {
+ case 0:
+ when(client.startContainer(any(Container.class),
+ any(ContainerLaunchContext.class))).thenReturn(
+ Collections.<String, ByteBuffer>emptyMap());
+ when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+ any(Token.class))).thenReturn(
+ recordFactory.newRecordInstance(ContainerStatus.class));
+ doNothing().when(client).stopContainer(any(ContainerId.class),
+ any(NodeId.class), any(Token.class));
+ break;
+ case 1:
+ doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
+ .startContainer(any(Container.class),
+ any(ContainerLaunchContext.class));
+ doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
+ .getContainerStatus(any(ContainerId.class), any(NodeId.class),
+ any(Token.class));
+ doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+ .stopContainer(any(ContainerId.class), any(NodeId.class),
+ any(Token.class));
+ break;
+ case 2:
+ when(client.startContainer(any(Container.class),
+ any(ContainerLaunchContext.class))).thenReturn(
+ Collections.<String, ByteBuffer>emptyMap());
+ when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+ any(Token.class))).thenReturn(
+ recordFactory.newRecordInstance(ContainerStatus.class));
+ doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+ .stopContainer(any(ContainerId.class), any(NodeId.class),
+ any(Token.class));
+ }
+ return client;
+ }
+
+ @Test (timeout = 10000)
+ public void testOutOfOrder() throws Exception {
+ CyclicBarrier barrierA = new CyclicBarrier(2);
+ CyclicBarrier barrierB = new CyclicBarrier(2);
+ CyclicBarrier barrierC = new CyclicBarrier(2);
+ asyncClient = new MockNMClientAsync2(barrierA, barrierB, barrierC);
+ asyncClient.init(new Configuration());
+ asyncClient.start();
+
+ final Container container = mockContainer(1);
+ final ContainerLaunchContext clc =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // start container from another thread
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ asyncClient.startContainerAsync(container, clc);
+ }
+ };
+ t.start();
+
+ barrierA.await();
+ asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ barrierC.await();
+
+ Assert.assertFalse("Starting and stopping should be out of order",
+ ((TestCallbackHandler2) asyncClient.getCallbackHandler())
+ .exceptionOccurred.get());
+ }
+
+ private class MockNMClientAsync2 extends NMClientAsyncImpl {
+ private CyclicBarrier barrierA;
+ private CyclicBarrier barrierB;
+
+ protected MockNMClientAsync2(CyclicBarrier barrierA, CyclicBarrier barrierB,
+ CyclicBarrier barrierC) throws YarnException, IOException {
+ super(MockNMClientAsync2.class.getName(), mockNMClient(0),
+ new TestCallbackHandler2(barrierC));
+ this.barrierA = barrierA;
+ this.barrierB = barrierB;
+ }
+
+ private class MockContainerEventProcessor extends ContainerEventProcessor {
+
+ public MockContainerEventProcessor(ContainerEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (event.getType() == ContainerEventType.START_CONTAINER) {
+ barrierA.await();
+ barrierB.await();
+ }
+ super.run();
+ if (event.getType() == ContainerEventType.STOP_CONTAINER) {
+ barrierB.await();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ protected ContainerEventProcessor getContainerEventProcessor(
+ ContainerEvent event) {
+ return new MockContainerEventProcessor(event);
+ }
+ }
+
+ private class TestCallbackHandler2
+ implements NMClientAsync.CallbackHandler {
+ private CyclicBarrier barrierC;
+ private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
+
+ public TestCallbackHandler2(CyclicBarrier barrierC) {
+ this.barrierC = barrierC;
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ if (!t.getMessage().equals(NMClientAsyncImpl.StatefulContainer
+ .OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
+ exceptionOccurred.set(true);
+ return;
+ }
+ try {
+ barrierC.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId,
+ Throwable t) {
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ }
+
+ }
+
+ private Container mockContainer(int i) {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(attemptId, i);
+ nodeId = NodeId.newInstance("localhost", 0);
+ // Create an empty record
+ containerToken = recordFactory.newRecordInstance(Token.class);
+ return Container.newInstance(containerId, nodeId, null, null, null,
+ containerToken);
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestAMRMClient {
+ static Configuration conf = null;
+ static MiniYARNCluster yarnCluster = null;
+ static YarnClient yarnClient = null;
+ static List<NodeReport> nodeReports = null;
+ static ApplicationAttemptId attemptId = null;
+ static int nodeCount = 3;
+
+ static Resource capability;
+ static Priority priority;
+ static String node;
+ static String rack;
+ static String[] nodes;
+ static String[] racks;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // start minicluster
+ conf = new YarnConfiguration();
+ yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports();
+
+ priority = Priority.newInstance(1);
+ capability = Resource.newInstance(1024, 1);
+
+ node = nodeReports.get(0).getNodeId().getHost();
+ rack = nodeReports.get(0).getRackName();
+ nodes = new String[]{ node };
+ racks = new String[]{ rack };
+ }
+
+ @Before
+ public void startApp() throws Exception {
+ // submit new app
+ GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ break;
+ }
+ }
+ }
+
+ @After
+ public void cancelApp() {
+ attemptId = null;
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchingFit() throws YarnException, IOException {
+ AMRMClient<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = AMRMClient.<StoredContainerRequest>createAMRMClient(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Resource capability1 = Resource.newInstance(1024, 2);
+ Resource capability2 = Resource.newInstance(1024, 1);
+ Resource capability3 = Resource.newInstance(1000, 2);
+ Resource capability4 = Resource.newInstance(2000, 1);
+ Resource capability5 = Resource.newInstance(1000, 3);
+ Resource capability6 = Resource.newInstance(2000, 1);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability1, nodes, racks, priority);
+ StoredContainerRequest storedContainer2 =
+ new StoredContainerRequest(capability2, nodes, racks, priority);
+ StoredContainerRequest storedContainer3 =
+ new StoredContainerRequest(capability3, nodes, racks, priority);
+ StoredContainerRequest storedContainer4 =
+ new StoredContainerRequest(capability4, nodes, racks, priority);
+ StoredContainerRequest storedContainer5 =
+ new StoredContainerRequest(capability5, nodes, racks, priority);
+ StoredContainerRequest storedContainer6 =
+ new StoredContainerRequest(capability6, nodes, racks, priority);
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer2);
+ amClient.addContainerRequest(storedContainer3);
+ amClient.addContainerRequest(storedContainer4);
+ amClient.addContainerRequest(storedContainer5);
+ amClient.addContainerRequest(storedContainer6);
+
+ // test matching of containers
+ List<? extends Collection<StoredContainerRequest>> matches;
+ StoredContainerRequest storedRequest;
+ // exact match
+ Resource testCapability1 = Resource.newInstance(1024, 2);
+ matches = amClient.getMatchingRequests(priority, node, testCapability1);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ amClient.removeContainerRequest(storedContainer1);
+
+ // exact matching with order maintained
+ Resource testCapability2 = Resource.newInstance(2000, 1);
+ matches = amClient.getMatchingRequests(priority, node, testCapability2);
+ verifyMatches(matches, 2);
+ // must be returned in the order they were made
+ int i = 0;
+ for(StoredContainerRequest storedRequest1 : matches.get(0)) {
+ if(i++ == 0) {
+ assertTrue(storedContainer4 == storedRequest1);
+ } else {
+ assertTrue(storedContainer6 == storedRequest1);
+ }
+ }
+ amClient.removeContainerRequest(storedContainer6);
+
+ // matching with larger container. all requests returned
+ Resource testCapability3 = Resource.newInstance(4000, 4);
+ matches = amClient.getMatchingRequests(priority, node, testCapability3);
+ assert(matches.size() == 4);
+
+ Resource testCapability4 = Resource.newInstance(1024, 2);
+ matches = amClient.getMatchingRequests(priority, node, testCapability4);
+ assert(matches.size() == 2);
+ // verify non-fitting containers are not returned and fitting ones are
+ for(Collection<StoredContainerRequest> testSet : matches) {
+ assertTrue(testSet.size() == 1);
+ StoredContainerRequest testRequest = testSet.iterator().next();
+ assertTrue(testRequest != storedContainer4);
+ assertTrue(testRequest != storedContainer5);
+ assert(testRequest == storedContainer2 ||
+ testRequest == storedContainer3);
+ }
+
+ Resource testCapability5 = Resource.newInstance(512, 4);
+ matches = amClient.getMatchingRequests(priority, node, testCapability5);
+ assert(matches.size() == 0);
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ private void verifyMatches(
+ List<? extends Collection<StoredContainerRequest>> matches,
+ int matchSize) {
+ assertTrue(matches.size() == 1);
+ assertTrue(matches.get(0).size() == matchSize);
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+ AMRMClientImpl<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Resource capability = Resource.newInstance(1024, 2);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability, nodes, null, priority);
+ amClient.addContainerRequest(storedContainer1);
+
+ // verify matching with original node and inferred rack
+ List<? extends Collection<StoredContainerRequest>> matches;
+ StoredContainerRequest storedRequest;
+ // exact match node
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ // inferred match rack
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+
+ // inferred rack match no longer valid after request is removed
+ amClient.removeContainerRequest(storedContainer1);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ assertTrue(matches.isEmpty());
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchStorage() throws YarnException, IOException {
+ AMRMClientImpl<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient =
+ (AMRMClientImpl<StoredContainerRequest>) AMRMClient
+ .<StoredContainerRequest> createAMRMClient(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Priority priority1 = Records.newRecord(Priority.class);
+ priority1.setPriority(2);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability, nodes, racks, priority);
+ StoredContainerRequest storedContainer2 =
+ new StoredContainerRequest(capability, nodes, racks, priority);
+ StoredContainerRequest storedContainer3 =
+ new StoredContainerRequest(capability, null, null, priority1);
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer2);
+ amClient.addContainerRequest(storedContainer3);
+
+ // test addition and storage
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+ .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ assertTrue(containersRequestedAny == 2);
+ containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
+ .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ assertTrue(containersRequestedAny == 1);
+ List<? extends Collection<StoredContainerRequest>> matches =
+ amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 2);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 2);
+ matches =
+ amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+ verifyMatches(matches, 2);
+ matches = amClient.getMatchingRequests(priority1, rack, capability);
+ assertTrue(matches.isEmpty());
+ matches =
+ amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+ verifyMatches(matches, 1);
+
+ // test removal
+ amClient.removeContainerRequest(storedContainer3);
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 2);
+ amClient.removeContainerRequest(storedContainer2);
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 1);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 1);
+
+ // test matching of containers
+ StoredContainerRequest storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ amClient.removeContainerRequest(storedContainer1);
+ matches =
+ amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+ assertTrue(matches.isEmpty());
+ matches =
+ amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+ assertTrue(matches.isEmpty());
+ // 0 requests left. everything got cleaned up
+ assertTrue(amClient.remoteRequestsTable.isEmpty());
+
+ // go through an exemplary allocation, matching and release cycle
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer3);
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 2;
+ while (allocatedContainerCount < 2
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ assertTrue(nodeCount == amClient.getClusterNodeCount());
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+ for(Container container : allocResponse.getAllocatedContainers()) {
+ ContainerRequest expectedRequest =
+ container.getPriority().equals(storedContainer1.getPriority()) ?
+ storedContainer1 : storedContainer3;
+ matches = amClient.getMatchingRequests(container.getPriority(),
+ ResourceRequest.ANY,
+ container.getResource());
+ // test correct matched container is returned
+ verifyMatches(matches, 1);
+ ContainerRequest matchedRequest = matches.get(0).iterator().next();
+ assertTrue(matchedRequest == expectedRequest);
+
+ // assign this container, use it and release it
+ amClient.releaseAssignedContainer(container.getId());
+ }
+ if(allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(1000);
+ }
+ }
+
+ assertTrue(allocatedContainerCount == 2);
+ assertTrue(amClient.release.size() == 2);
+ assertTrue(amClient.ask.size() == 0);
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.release.size() == 0);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClient() throws YarnException, IOException {
+ AMRMClient<ContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId);
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ testAllocation((AMRMClientImpl<ContainerRequest>)amClient);
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
+ throws YarnException, IOException {
+ // setup container request
+
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 1));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 3));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
+
+ int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
+ .get(node).get(capability).remoteRequest.getNumContainers();
+ int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
+ .get(rack).get(capability).remoteRequest.getNumContainers();
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+ .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+
+ assertTrue(containersRequestedNode == 2);
+ assertTrue(containersRequestedRack == 2);
+ assertTrue(containersRequestedAny == 2);
+ assertTrue(amClient.ask.size() == 3);
+ assertTrue(amClient.release.size() == 0);
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 2;
+ Set<ContainerId> releases = new TreeSet<ContainerId>();
+
+ ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
+ Assert.assertEquals(0, nmTokens.size());
+ HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
+
+ while (allocatedContainerCount < containersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ assertTrue(nodeCount == amClient.getClusterNodeCount());
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+ for(Container container : allocResponse.getAllocatedContainers()) {
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
+ Iterator<String> nodeI = nmTokens.keySet().iterator();
+ while (nodeI.hasNext()) {
+ String nodeId = nodeI.next();
+ if (!receivedNMTokens.containsKey(nodeId)) {
+ receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
+ } else {
+ Assert.fail("Received token again for : " + nodeId);
+ }
+ }
+ nodeI = receivedNMTokens.keySet().iterator();
+ while (nodeI.hasNext()) {
+ nmTokens.remove(nodeI.next());
+ }
+
+ if(allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(1000);
+ }
+ }
+
+ Assert.assertEquals(0, amClient.getNMTokens().size());
+ // Should receive atleast 1 token
+ Assert.assertTrue(receivedNMTokens.size() > 0
+ && receivedNMTokens.size() <= nodeCount);
+
+ assertTrue(allocatedContainerCount == containersRequestedAny);
+ assertTrue(amClient.release.size() == 2);
+ assertTrue(amClient.ask.size() == 0);
+
+ // need to tell the AMRMClient that we dont need these resources anymore
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
+ assertTrue(amClient.ask.size() == 3);
+ // send 0 container count request for resources that are no longer needed
+ ResourceRequest snoopRequest = amClient.ask.iterator().next();
+ assertTrue(snoopRequest.getNumContainers() == 0);
+
+ // test RPC exception handling
+ amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 2));
+ snoopRequest = amClient.ask.iterator().next();
+ assertTrue(snoopRequest.getNumContainers() == 2);
+
+ ApplicationMasterProtocol realRM = amClient.rmClient;
+ try {
+ ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
+ when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+ new Answer<AllocateResponse>() {
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Exception {
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes,
+ racks, priority, 2));
+ throw new Exception();
+ }
+ });
+ amClient.rmClient = mockRM;
+ amClient.allocate(0.1f);
+ }catch (Exception ioe) {}
+ finally {
+ amClient.rmClient = realRM;
+ }
+
+ assertTrue(amClient.release.size() == 2);
+ assertTrue(amClient.ask.size() == 3);
+ snoopRequest = amClient.ask.iterator().next();
+ // verify that the remove request made in between makeRequest and allocate
+ // has not been lost
+ assertTrue(snoopRequest.getNumContainers() == 0);
+
+ iterationsLeft = 2;
+ // do a few iterations to ensure RM is not going send new containers
+ while(!releases.isEmpty() || iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+ if(allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for(ContainerStatus cStatus :allocResponse
+ .getCompletedContainersStatuses()) {
+ if(releases.contains(cStatus.getContainerId())) {
+ assertTrue(cStatus.getState() == ContainerState.COMPLETE);
+ assertTrue(cStatus.getExitStatus() == -100);
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if(iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(1000);
+ }
+ }
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestAMRMClientContainerRequest {
+ @Test
+ public void testFillInRacks() {
+ AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+
+ Configuration conf = new Configuration();
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ new String[] {"/rack2"}, Priority.newInstance(1), 4);
+ client.addContainerRequest(request);
+ verifyResourceRequestLocation(client, request, "host1");
+ verifyResourceRequestLocation(client, request, "host2");
+ verifyResourceRequestLocation(client, request, "/rack1");
+ verifyResourceRequestLocation(client, request, "/rack2");
+ verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+ }
+
+ private static class MyResolver implements DNSToSwitchMapping {
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ return Arrays.asList("/rack1");
+ }
+
+ @Override
+ public void reloadCachedMappings() {}
+ }
+
+ private void verifyResourceRequestLocation(
+ AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+ String location) {
+ ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
+ .get(location).get(request.getCapability()).remoteRequest;
+ assertEquals(location, ask.getResourceName());
+ assertEquals(request.getContainerCount(), ask.getNumContainers());
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMClient {
+ Configuration conf = null;
+ MiniYARNCluster yarnCluster = null;
+ YarnClientImpl yarnClient = null;
+ AMRMClientImpl<ContainerRequest> rmClient = null;
+ NMClientImpl nmClient = null;
+ List<NodeReport> nodeReports = null;
+ ApplicationAttemptId attemptId = null;
+ int nodeCount = 3;
+
+ @Before
+ public void setup() throws YarnException, IOException {
+ // start minicluster
+ conf = new YarnConfiguration();
+ yarnCluster =
+ new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ assertNotNull(yarnCluster);
+ assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+
+ // start rm client
+ yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ assertNotNull(yarnClient);
+ assertEquals(STATE.STARTED, yarnClient.getServiceState());
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports();
+
+ // submit new app
+ GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Priority.newInstance(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ int iterationsLeft = 30;
+ while (iterationsLeft > 0) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ break;
+ }
+ sleep(1000);
+ --iterationsLeft;
+ }
+ if (iterationsLeft == 0) {
+ fail("Application hasn't bee started");
+ }
+
+ // start am rm client
+ rmClient =
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
+ .<ContainerRequest> createAMRMClient(attemptId);
+ rmClient.init(conf);
+ rmClient.start();
+ assertNotNull(rmClient);
+ assertEquals(STATE.STARTED, rmClient.getServiceState());
+
+ // start am nm client
+ nmClient = (NMClientImpl) NMClient.createNMClient();
+ nmClient.init(conf);
+ nmClient.start();
+ assertNotNull(nmClient);
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
+ }
+
+ @After
+ public void tearDown() {
+ rmClient.stop();
+ yarnClient.stop();
+ yarnCluster.stop();
+ }
+
+ private void stopNmClient(boolean stopContainers) {
+ assertNotNull("Null nmClient", nmClient);
+ // leave one unclosed
+ assertEquals(1, nmClient.startedContainers.size());
+ // default true
+ assertTrue(nmClient.getCleanupRunningContainers().get());
+ nmClient.cleanupRunningContainersOnStop(stopContainers);
+ assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
+ nmClient.stop();
+ }
+
+ @Test (timeout = 60000)
+ public void testNMClientNoCleanupOnStop()
+ throws YarnException, IOException {
+
+ rmClient.registerApplicationMaster("Host", 10000, "");
+
+ testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+ // don't stop the running containers
+ stopNmClient(false);
+ assertFalse(nmClient.startedContainers. isEmpty());
+ //now cleanup
+ nmClient.cleanupRunningContainers();
+ assertEquals(0, nmClient.startedContainers.size());
+ }
+
+ @Test (timeout = 60000)
+ public void testNMClient()
+ throws YarnException, IOException {
+
+ rmClient.registerApplicationMaster("Host", 10000, "");
+
+ testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+ // stop the running containers on close
+ assertFalse(nmClient.startedContainers.isEmpty());
+ nmClient.cleanupRunningContainersOnStop(true);
+ assertTrue(nmClient.getCleanupRunningContainers().get());
+ nmClient.stop();
+ }
+
+ private Set<Container> allocateContainers(
+ AMRMClientImpl<ContainerRequest> rmClient, int num)
+ throws YarnException, IOException {
+ // setup container request
+ Resource capability = Resource.newInstance(1024, 0);
+ Priority priority = Priority.newInstance(0);
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String rack = nodeReports.get(0).getRackName();
+ String[] nodes = new String[] {node};
+ String[] racks = new String[] {rack};
+
+ for (int i = 0; i < num; ++i) {
+ rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 1));
+ }
+
+ int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
+ .get(ResourceRequest.ANY).get(capability).remoteRequest
+ .getNumContainers();
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 2;
+ Set<Container> containers = new TreeSet<Container>();
+ while (allocatedContainerCount < containersRequestedAny
+ && iterationsLeft > 0) {
+ AllocateResponse allocResponse = rmClient.allocate(0.1f);
+
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+ for(Container container : allocResponse.getAllocatedContainers()) {
+ containers.add(container);
+ }
+ if(allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(1000);
+ }
+
+ --iterationsLeft;
+ }
+ return containers;
+ }
+
+ private void testContainerManagement(NMClientImpl nmClient,
+ Set<Container> containers) throws YarnException, IOException {
+ int size = containers.size();
+ int i = 0;
+ for (Container container : containers) {
+ // getContainerStatus shouldn't be called before startContainer,
+ // otherwise, NodeManager cannot find the container
+ try {
+ nmClient.getContainerStatus(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ fail("Exception is expected");
+ } catch (YarnException e) {
+ assertTrue("The thrown exception is not expected",
+ e.getMessage().contains("is not handled by this NodeManager"));
+ }
+
+ // stopContainer shouldn't be called before startContainer,
+ // otherwise, an exception will be thrown
+ try {
+ nmClient.stopContainer(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ fail("Exception is expected");
+ } catch (YarnException e) {
+ if (!e.getMessage()
+ .contains("is either not started yet or already stopped")) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e).initCause(
+ e));
+ }
+ }
+
+ Credentials ts = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ContainerLaunchContext clc =
+ Records.newRecord(ContainerLaunchContext.class);
+ clc.setTokens(securityTokens);
+ try {
+ nmClient.startContainer(container, clc);
+ } catch (YarnException e) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e).initCause(e));
+ }
+
+ // leave one container unclosed
+ if (++i < size) {
+ // NodeManager may still need some time to make the container started
+ testGetContainerStatus(container, i, ContainerState.RUNNING, "",
+ -1000);
+
+ try {
+ nmClient.stopContainer(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ } catch (YarnException e) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e)
+ .initCause(e));
+ }
+
+ // getContainerStatus can be called after stopContainer
+ testGetContainerStatus(container, i, ContainerState.COMPLETE,
+ "Container killed by the ApplicationMaster.", 143);
+ }
+ }
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void testGetContainerStatus(Container container, int index,
+ ContainerState state, String diagnostics, int exitStatus)
+ throws YarnException, IOException {
+ while (true) {
+ try {
+ ContainerStatus status = nmClient.getContainerStatus(
+ container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ // NodeManager may still need some time to get the stable
+ // container status
+ if (status.getState() == state) {
+ assertEquals(container.getId(), status.getContainerId());
+ assertTrue("" + index + ": " + status.getDiagnostics(),
+ status.getDiagnostics().contains(diagnostics));
+ assertEquals(exitStatus, status.getExitStatus());
+ break;
+ }
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class TestYarnClient {
+
+ @Test
+ public void test() {
+ // More to come later.
+ }
+
+ @Test
+ public void testClientStop() {
+ Configuration conf = new Configuration();
+ ResourceManager rm = new ResourceManager();
+ rm.init(conf);
+ rm.start();
+
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ client.start();
+ client.stop();
+ }
+
+ @Test (timeout = 30000)
+ public void testSubmitApplication() {
+ Configuration conf = new Configuration();
+ conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+ 100); // speed up tests
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+
+ YarnApplicationState[] exitStates = new YarnApplicationState[]
+ {
+ YarnApplicationState.SUBMITTED,
+ YarnApplicationState.ACCEPTED,
+ YarnApplicationState.RUNNING,
+ YarnApplicationState.FINISHED,
+ YarnApplicationState.FAILED,
+ YarnApplicationState.KILLED
+ };
+ for (int i = 0; i < exitStates.length; ++i) {
+ ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ ApplicationId applicationId = ApplicationId.newInstance(
+ System.currentTimeMillis(), i);
+ when(context.getApplicationId()).thenReturn(applicationId);
+ ((MockYarnClient) client).setYarnApplicationState(exitStates[i]);
+ try {
+ client.submitApplication(context);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ } catch (IOException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ verify(((MockYarnClient) client).mockReport,times(4 * i + 4))
+ .getYarnApplicationState();
+ }
+
+ client.stop();
+ }
+
+ @Test(timeout = 30000)
+ public void testApplicationType() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ MockRM rm = new MockRM();
+ rm.start();
+ RMApp app = rm.submitApp(2000);
+ RMApp app1 =
+ rm.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+ null, "MAPREDUCE");
+ Assert.assertEquals("YARN", app.getApplicationType());
+ Assert.assertEquals("MAPREDUCE", app1.getApplicationType());
+ rm.stop();
+ }
+
+ @Test(timeout = 30000)
+ public void testApplicationTypeLimit() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ MockRM rm = new MockRM();
+ rm.start();
+ RMApp app1 =
+ rm.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+ null, "MAPREDUCE-LENGTH-IS-20");
+ Assert.assertEquals("MAPREDUCE-LENGTH-IS-", app1.getApplicationType());
+ rm.stop();
+ }
+
+ private static class MockYarnClient extends YarnClientImpl {
+ private ApplicationReport mockReport;
+
+ public MockYarnClient() {
+ super();
+ }
+
+ @Override
+ public void start() {
+ rmClient = mock(ApplicationClientProtocol.class);
+ GetApplicationReportResponse mockResponse =
+ mock(GetApplicationReportResponse.class);
+ mockReport = mock(ApplicationReport.class);
+ try{
+ when(rmClient.getApplicationReport(any(
+ GetApplicationReportRequest.class))).thenReturn(mockResponse);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ } catch (IOException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public void setYarnApplicationState(YarnApplicationState state) {
+ when(mockReport.getYarnApplicationState()).thenReturn(
+ YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.NEW_SAVING, state);
+ }
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java Tue Jun 18 04:02:47 2013
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.Test;