You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/18 06:02:48 UTC

svn commit: r1494017 [3/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yarn-appl...

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import static org.mockito.Matchers.anyFloat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestAMRMClientAsync {
+
+  private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testAMRMClientAsync() throws Exception {
+    Configuration conf = new Configuration();
+    final AtomicBoolean heartbeatBlock = new AtomicBoolean(true);
+    List<ContainerStatus> completed1 = Arrays.asList(
+        ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+            ContainerState.COMPLETE, "", 0));
+    List<Container> allocated1 = Arrays.asList(
+        Container.newInstance(null, null, null, null, null, null));
+    final AllocateResponse response1 = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), allocated1, null);
+    final AllocateResponse response2 = createAllocateResponse(completed1,
+        new ArrayList<Container>(), null);
+    final AllocateResponse emptyResponse = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+    final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
+    when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
+      @Override
+      public AllocateResponse answer(InvocationOnMock invocation)
+          throws Throwable {
+        secondHeartbeatSync.incrementAndGet();
+        while(heartbeatBlock.get()) {
+          synchronized(heartbeatBlock) {
+            heartbeatBlock.wait();
+          }
+        }
+        secondHeartbeatSync.incrementAndGet();
+        return response2;
+      }
+    }).thenReturn(emptyResponse);
+    when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
+      .thenReturn(null);
+    when(client.getClusterAvailableResources()).thenAnswer(new Answer<Resource>() {
+      @Override
+      public Resource answer(InvocationOnMock invocation)
+          throws Throwable {
+        // take client lock to simulate behavior of real impl
+        synchronized (client) { 
+          Thread.sleep(10);
+        }
+        return null;
+      }
+    });
+    
+    AMRMClientAsync<ContainerRequest> asyncClient = 
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+    asyncClient.registerApplicationMaster("localhost", 1234, null);
+    
+    // while the CallbackHandler will still only be processing the first response,
+    // heartbeater thread should still be sending heartbeats.
+    // To test this, wait for the second heartbeat to be received. 
+    while (secondHeartbeatSync.get() < 1) {
+      Thread.sleep(10);
+    }
+    
+    // heartbeat will be blocked. make sure we can call client methods at this
+    // time. Checks that heartbeat is not holding onto client lock
+    assert(secondHeartbeatSync.get() < 2);
+    asyncClient.getClusterAvailableResources();
+    // method returned. now unblock heartbeat
+    assert(secondHeartbeatSync.get() < 2);
+    synchronized (heartbeatBlock) {
+      heartbeatBlock.set(false);
+      heartbeatBlock.notifyAll();
+    }
+    
+    // allocated containers should come before completed containers
+    Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+    
+    // wait for the allocated containers from the first heartbeat's response
+    while (callbackHandler.takeAllocatedContainers() == null) {
+      Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+      Thread.sleep(10);
+    }
+    
+    // wait for the completed containers from the second heartbeat's response
+    while (callbackHandler.takeCompletedContainers() == null) {
+      Thread.sleep(10);
+    }
+    
+    asyncClient.stop();
+    
+    Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
+    Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+  }
+
+  @Test(timeout=10000)
+  public void testAMRMClientAsyncException() throws Exception {
+    Configuration conf = new Configuration();
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+    String exStr = "TestException";
+    YarnException mockException = mock(YarnException.class);
+    when(mockException.getMessage()).thenReturn(exStr);
+    when(client.allocate(anyFloat())).thenThrow(mockException);
+
+    AMRMClientAsync<ContainerRequest> asyncClient = 
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+    
+    synchronized (callbackHandler.notifier) {
+      asyncClient.registerApplicationMaster("localhost", 1234, null);
+      while(callbackHandler.savedException == null) {
+        try {
+          callbackHandler.notifier.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    
+    Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
+    
+    asyncClient.stop();
+    // stopping should have joined all threads and completed all callbacks
+    Assert.assertTrue(callbackHandler.callbackCount == 0);
+  }
+  
+  @Test//(timeout=10000)
+  public void testAMRMClientAsyncReboot() throws Exception {
+    Configuration conf = new Configuration();
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+    
+    final AllocateResponse rebootResponse = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+    rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
+    when(client.allocate(anyFloat())).thenReturn(rebootResponse);
+    
+    AMRMClientAsync<ContainerRequest> asyncClient = 
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+    
+    synchronized (callbackHandler.notifier) {
+      asyncClient.registerApplicationMaster("localhost", 1234, null);
+      while(callbackHandler.reboot == false) {
+        try {
+          callbackHandler.notifier.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    
+    asyncClient.stop();
+    // stopping should have joined all threads and completed all callbacks
+    Assert.assertTrue(callbackHandler.callbackCount == 0);
+  }
+  
+  private AllocateResponse createAllocateResponse(
+      List<ContainerStatus> completed, List<Container> allocated,
+      List<NMToken> nmTokens) {
+    AllocateResponse response =
+        AllocateResponse.newInstance(0, completed, allocated,
+            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens);
+    return response;
+  }
+
+  public static ContainerId newContainerId(int appId, int appAttemptId,
+      long timestamp, int containerId) {
+    ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, appAttemptId);
+    return ContainerId.newInstance(applicationAttemptId, containerId);
+  }
+
+  private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    private volatile List<ContainerStatus> completedContainers;
+    private volatile List<Container> allocatedContainers;
+    Exception savedException = null;
+    boolean reboot = false;
+    Object notifier = new Object();
+    
+    int callbackCount = 0;
+    
+    public List<ContainerStatus> takeCompletedContainers() {
+      List<ContainerStatus> ret = completedContainers;
+      if (ret == null) {
+        return null;
+      }
+      completedContainers = null;
+      synchronized (ret) {
+        ret.notify();
+      }
+      return ret;
+    }
+    
+    public List<Container> takeAllocatedContainers() {
+      List<Container> ret = allocatedContainers;
+      if (ret == null) {
+        return null;
+      }
+      allocatedContainers = null;
+      synchronized (ret) {
+        ret.notify();
+      }
+      return ret;
+    }
+    
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+      completedContainers = statuses;
+      // wait for containers to be taken before returning
+      synchronized (completedContainers) {
+        while (completedContainers != null) {
+          try {
+            completedContainers.wait();
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted during wait", ex);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+      allocatedContainers = containers;
+      // wait for containers to be taken before returning
+      synchronized (allocatedContainers) {
+        while (allocatedContainers != null) {
+          try {
+            allocatedContainers.wait();
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted during wait", ex);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void onShutdownRequest() {
+      reboot = true;
+      synchronized (notifier) {
+        notifier.notifyAll();        
+      }
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+    @Override
+    public float getProgress() {
+      callbackCount++;
+      return 0.5f;
+    }
+
+    @Override
+    public void onError(Exception e) {
+      savedException = e;
+      synchronized (notifier) {
+        notifier.notifyAll();        
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.junit.After;
+import org.junit.Test;
+
+
+public class TestNMClientAsync {
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  private NMClientAsyncImpl asyncClient;
+  private NodeId nodeId;
+  private Token containerToken;
+
+  @After
+  public void teardown() {
+    ServiceOperations.stop(asyncClient);
+  }
+
+  @Test (timeout = 10000)
+  public void testNMClientAsync() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
+
+    // Threads to run are more than the max size of the thread pool
+    int expectedSuccess = 40;
+    int expectedFailure = 40;
+
+    asyncClient = new MockNMClientAsync1(expectedSuccess, expectedFailure);
+    asyncClient.init(conf);
+    Assert.assertEquals("The max thread pool size is not correctly set",
+        10, asyncClient.maxThreadPoolSize);
+    asyncClient.start();
+
+
+    for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
+      if (i == expectedSuccess) {
+        while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+            .isAllSuccessCallsExecuted()) {
+          Thread.sleep(10);
+        }
+        asyncClient.setClient(mockNMClient(1));
+      }
+      Container container = mockContainer(i);
+      ContainerLaunchContext clc =
+          recordFactory.newRecordInstance(ContainerLaunchContext.class);
+      asyncClient.startContainerAsync(container, clc);
+    }
+    while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+        .isStartAndQueryFailureCallsExecuted()) {
+      Thread.sleep(10);
+    }
+    asyncClient.setClient(mockNMClient(2));
+    ((TestCallbackHandler1) asyncClient.getCallbackHandler()).path = false;
+    for (int i = 0; i < expectedFailure; ++i) {
+      Container container = mockContainer(
+          expectedSuccess + expectedFailure + i);
+      ContainerLaunchContext clc =
+          recordFactory.newRecordInstance(ContainerLaunchContext.class);
+      asyncClient.startContainerAsync(container, clc);
+    }
+    while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+        .isStopFailureCallsExecuted()) {
+      Thread.sleep(10);
+    }
+    for (String errorMsg :
+        ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+            .errorMsgs) {
+      System.out.println(errorMsg);
+    }
+    Assert.assertEquals("Error occurs in CallbackHandler", 0,
+        ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+            .errorMsgs.size());
+    for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
+      System.out.println(errorMsg);
+    }
+    Assert.assertEquals("Error occurs in ContainerEventProcessor", 0,
+        ((MockNMClientAsync1) asyncClient).errorMsgs.size());
+    // When the callback functions are all executed, the event processor threads
+    // may still not terminate and the containers may still not removed.
+    while (asyncClient.containers.size() > 0) {
+      Thread.sleep(10);
+    }
+    asyncClient.stop();
+    Assert.assertFalse(
+        "The thread of Container Management Event Dispatcher is still alive",
+        asyncClient.eventDispatcherThread.isAlive());
+    Assert.assertTrue("The thread pool is not shut down",
+        asyncClient.threadPool.isShutdown());
+  }
+
+  private class MockNMClientAsync1 extends NMClientAsyncImpl {
+    private Set<String> errorMsgs =
+        Collections.synchronizedSet(new HashSet<String>());
+
+    protected MockNMClientAsync1(int expectedSuccess, int expectedFailure)
+        throws YarnException, IOException {
+      super(MockNMClientAsync1.class.getName(), mockNMClient(0),
+          new TestCallbackHandler1(expectedSuccess, expectedFailure));
+    }
+
+    private class MockContainerEventProcessor extends ContainerEventProcessor {
+      public MockContainerEventProcessor(ContainerEvent event) {
+        super(event);
+      }
+
+      @Override
+      public void run() {
+        try {
+          super.run();
+        } catch (RuntimeException e) {
+          // If the unexpected throwable comes from error callback functions, it
+          // will break ContainerEventProcessor.run(). Therefore, monitor
+          // the exception here
+          errorMsgs.add("Unexpected throwable from callback functions should" +
+              " be ignored by Container " + event.getContainerId());
+        }
+      }
+    }
+
+    @Override
+    protected ContainerEventProcessor getContainerEventProcessor(
+        ContainerEvent event) {
+      return new MockContainerEventProcessor(event);
+    }
+  }
+
+  private class TestCallbackHandler1
+      implements NMClientAsync.CallbackHandler {
+
+    private boolean path = true;
+
+    private int expectedSuccess;
+    private int expectedFailure;
+
+    private AtomicInteger actualStartSuccess = new AtomicInteger(0);
+    private AtomicInteger actualStartFailure = new AtomicInteger(0);
+    private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
+    private AtomicInteger actualQueryFailure = new AtomicInteger(0);
+    private AtomicInteger actualStopSuccess = new AtomicInteger(0);
+    private AtomicInteger actualStopFailure = new AtomicInteger(0);
+
+    private AtomicIntegerArray actualStartSuccessArray;
+    private AtomicIntegerArray actualStartFailureArray;
+    private AtomicIntegerArray actualQuerySuccessArray;
+    private AtomicIntegerArray actualQueryFailureArray;
+    private AtomicIntegerArray actualStopSuccessArray;
+    private AtomicIntegerArray actualStopFailureArray;
+
+    private Set<String> errorMsgs =
+        Collections.synchronizedSet(new HashSet<String>());
+
+    public TestCallbackHandler1(int expectedSuccess, int expectedFailure) {
+      this.expectedSuccess = expectedSuccess;
+      this.expectedFailure = expectedFailure;
+
+      actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
+      actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
+      actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
+      actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
+      actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
+      actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (path) {
+        if (containerId.getId() >= expectedSuccess) {
+          errorMsgs.add("Container " + containerId +
+              " should throw the exception onContainerStarted");
+          return;
+        }
+        actualStartSuccess.addAndGet(1);
+        actualStartSuccessArray.set(containerId.getId(), 1);
+
+        // move on to the following success tests
+        asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+      } else {
+        // move on to the following failure tests
+        asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+      }
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerStatusReceived");
+        return;
+      }
+      actualQuerySuccess.addAndGet(1);
+      actualQuerySuccessArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerStopped");
+        return;
+      }
+      actualStopSuccess.addAndGet(1);
+      actualStopSuccessArray.set(containerId.getId(), 1);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      // If the unexpected throwable comes from success callback functions, it
+      // will be handled by the error callback functions. Therefore, monitor
+      // the exception here
+      if (t instanceof RuntimeException) {
+        errorMsgs.add("Unexpected throwable from callback functions should be" +
+            " ignored by Container " + containerId);
+      }
+      if (containerId.getId() < expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onStartContainerError");
+        return;
+      }
+      actualStartFailure.addAndGet(1);
+      actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
+      // move on to the following failure tests
+      asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      if (t instanceof RuntimeException) {
+        errorMsgs.add("Unexpected throwable from callback functions should be" +
+            " ignored by Container " + containerId);
+      }
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onStopContainerError");
+        return;
+      }
+
+      actualStopFailure.addAndGet(1);
+      actualStopFailureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onGetContainerStatusError(ContainerId containerId,
+        Throwable t) {
+      if (t instanceof RuntimeException) {
+        errorMsgs.add("Unexpected throwable from callback functions should be"
+            + " ignored by Container " + containerId);
+      }
+      if (containerId.getId() < expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onGetContainerStatusError");
+        return;
+      }
+      actualQueryFailure.addAndGet(1);
+      actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    public boolean isAllSuccessCallsExecuted() {
+      boolean isAllSuccessCallsExecuted =
+          actualStartSuccess.get() == expectedSuccess &&
+          actualQuerySuccess.get() == expectedSuccess &&
+          actualStopSuccess.get() == expectedSuccess;
+      if (isAllSuccessCallsExecuted) {
+        assertAtomicIntegerArray(actualStartSuccessArray);
+        assertAtomicIntegerArray(actualQuerySuccessArray);
+        assertAtomicIntegerArray(actualStopSuccessArray);
+      }
+      return isAllSuccessCallsExecuted;
+    }
+
+    public boolean isStartAndQueryFailureCallsExecuted() {
+      boolean isStartAndQueryFailureCallsExecuted =
+          actualStartFailure.get() == expectedFailure &&
+          actualQueryFailure.get() == expectedFailure;
+      if (isStartAndQueryFailureCallsExecuted) {
+        assertAtomicIntegerArray(actualStartFailureArray);
+        assertAtomicIntegerArray(actualQueryFailureArray);
+      }
+      return isStartAndQueryFailureCallsExecuted;
+    }
+
+    public boolean isStopFailureCallsExecuted() {
+      boolean isStopFailureCallsExecuted =
+          actualStopFailure.get() == expectedFailure;
+      if (isStopFailureCallsExecuted) {
+        assertAtomicIntegerArray(actualStopFailureArray);
+      }
+      return isStopFailureCallsExecuted;
+    }
+
+    private void assertAtomicIntegerArray(AtomicIntegerArray array) {
+      for (int i = 0; i < array.length(); ++i) {
+        Assert.assertEquals(1, array.get(i));
+      }
+    }
+  }
+
+  private NMClient mockNMClient(int mode)
+      throws YarnException, IOException {
+    NMClient client = mock(NMClient.class);
+    switch (mode) {
+      case 0:
+        when(client.startContainer(any(Container.class),
+            any(ContainerLaunchContext.class))).thenReturn(
+                Collections.<String, ByteBuffer>emptyMap());
+        when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+            any(Token.class))).thenReturn(
+                recordFactory.newRecordInstance(ContainerStatus.class));
+        doNothing().when(client).stopContainer(any(ContainerId.class),
+            any(NodeId.class), any(Token.class));
+        break;
+      case 1:
+        doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
+            .startContainer(any(Container.class),
+                any(ContainerLaunchContext.class));
+        doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
+            .getContainerStatus(any(ContainerId.class), any(NodeId.class),
+                any(Token.class));
+        doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+            .stopContainer(any(ContainerId.class), any(NodeId.class),
+                any(Token.class));
+        break;
+      case 2:
+        when(client.startContainer(any(Container.class),
+            any(ContainerLaunchContext.class))).thenReturn(
+                Collections.<String, ByteBuffer>emptyMap());
+        when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+            any(Token.class))).thenReturn(
+                recordFactory.newRecordInstance(ContainerStatus.class));
+        doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+            .stopContainer(any(ContainerId.class), any(NodeId.class),
+                any(Token.class));
+    }
+    return client;
+  }
+
+  @Test (timeout = 10000)
+  public void testOutOfOrder() throws Exception {
+    CyclicBarrier barrierA = new CyclicBarrier(2);
+    CyclicBarrier barrierB = new CyclicBarrier(2);
+    CyclicBarrier barrierC = new CyclicBarrier(2);
+    asyncClient = new MockNMClientAsync2(barrierA, barrierB, barrierC);
+    asyncClient.init(new Configuration());
+    asyncClient.start();
+
+    final Container container = mockContainer(1);
+    final ContainerLaunchContext clc =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    // start container from another thread
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        asyncClient.startContainerAsync(container, clc);
+      }
+    };
+    t.start();
+
+    barrierA.await();
+    asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
+        container.getContainerToken());
+    barrierC.await();
+
+    Assert.assertFalse("Starting and stopping should be out of order",
+        ((TestCallbackHandler2) asyncClient.getCallbackHandler())
+            .exceptionOccurred.get());
+  }
+
+  private class MockNMClientAsync2 extends NMClientAsyncImpl {
+    private CyclicBarrier barrierA;
+    private CyclicBarrier barrierB;
+
+    protected MockNMClientAsync2(CyclicBarrier barrierA, CyclicBarrier barrierB,
+        CyclicBarrier barrierC) throws YarnException, IOException {
+      super(MockNMClientAsync2.class.getName(), mockNMClient(0),
+          new TestCallbackHandler2(barrierC));
+      this.barrierA = barrierA;
+      this.barrierB = barrierB;
+    }
+
+    private class MockContainerEventProcessor extends ContainerEventProcessor {
+
+      public MockContainerEventProcessor(ContainerEvent event) {
+        super(event);
+      }
+
+      @Override
+      public void run() {
+        try {
+          if (event.getType() == ContainerEventType.START_CONTAINER) {
+            barrierA.await();
+            barrierB.await();
+          }
+          super.run();
+          if (event.getType() == ContainerEventType.STOP_CONTAINER) {
+            barrierB.await();
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (BrokenBarrierException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    @Override
+    protected ContainerEventProcessor getContainerEventProcessor(
+        ContainerEvent event) {
+      return new MockContainerEventProcessor(event);
+    }
+  }
+
+  private class TestCallbackHandler2
+      implements NMClientAsync.CallbackHandler {
+    private CyclicBarrier barrierC;
+    private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
+
+    public TestCallbackHandler2(CyclicBarrier barrierC) {
+      this.barrierC = barrierC;
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      if (!t.getMessage().equals(NMClientAsyncImpl.StatefulContainer
+          .OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
+        exceptionOccurred.set(true);
+        return;
+      }
+      try {
+        barrierC.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onGetContainerStatusError(ContainerId containerId,
+        Throwable t) {
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+    }
+
+  }
+
+  private Container mockContainer(int i) {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = ContainerId.newInstance(attemptId, i);
+    nodeId = NodeId.newInstance("localhost", 0);
+    // Create an empty record
+    containerToken = recordFactory.newRecordInstance(Token.class);
+    return Container.newInstance(containerId, nodeId, null, null, null,
+      containerToken);
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestAMRMClient {
+  static Configuration conf = null;
+  static MiniYARNCluster yarnCluster = null;
+  static YarnClient yarnClient = null;
+  static List<NodeReport> nodeReports = null;
+  static ApplicationAttemptId attemptId = null;
+  static int nodeCount = 3;
+  
+  static Resource capability;
+  static Priority priority;
+  static String node;
+  static String rack;
+  static String[] nodes;
+  static String[] racks;
+  
+  @BeforeClass
+  public static void setup() throws Exception {
+    // start minicluster
+    conf = new YarnConfiguration();
+    yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    // start rm client
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports();
+    
+    priority = Priority.newInstance(1);
+    capability = Resource.newInstance(1024, 1);
+
+    node = nodeReports.get(0).getNodeId().getHost();
+    rack = nodeReports.get(0).getRackName();
+    nodes = new String[]{ node };
+    racks = new String[]{ rack };
+  }
+  
+  @Before
+  public void startApp() throws Exception {
+    // submit new app
+    GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+    // unmanaged AM
+    appContext.setUnmanagedAM(true);
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        break;
+      }
+    }
+  }
+  
+  @After
+  public void cancelApp() {
+    attemptId = null;
+  }
+  
+  @AfterClass
+  public static void tearDown() {
+    if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
+      yarnClient.stop();
+    }
+    if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
+      yarnCluster.stop();
+    }
+  }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFit() throws YarnException, IOException {
+    AMRMClient<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.<StoredContainerRequest>createAMRMClient(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Resource capability1 = Resource.newInstance(1024, 2);
+      Resource capability2 = Resource.newInstance(1024, 1);
+      Resource capability3 = Resource.newInstance(1000, 2);
+      Resource capability4 = Resource.newInstance(2000, 1);
+      Resource capability5 = Resource.newInstance(1000, 3);
+      Resource capability6 = Resource.newInstance(2000, 1);
+
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability1, nodes, racks, priority);
+      StoredContainerRequest storedContainer2 = 
+          new StoredContainerRequest(capability2, nodes, racks, priority);
+      StoredContainerRequest storedContainer3 = 
+          new StoredContainerRequest(capability3, nodes, racks, priority);
+      StoredContainerRequest storedContainer4 = 
+          new StoredContainerRequest(capability4, nodes, racks, priority);
+      StoredContainerRequest storedContainer5 = 
+          new StoredContainerRequest(capability5, nodes, racks, priority);
+      StoredContainerRequest storedContainer6 = 
+          new StoredContainerRequest(capability6, nodes, racks, priority);
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer2);
+      amClient.addContainerRequest(storedContainer3);
+      amClient.addContainerRequest(storedContainer4);
+      amClient.addContainerRequest(storedContainer5);
+      amClient.addContainerRequest(storedContainer6);
+      
+      // test matching of containers
+      List<? extends Collection<StoredContainerRequest>> matches;
+      StoredContainerRequest storedRequest;
+      // exact match
+      Resource testCapability1 = Resource.newInstance(1024,  2);
+      matches = amClient.getMatchingRequests(priority, node, testCapability1);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      amClient.removeContainerRequest(storedContainer1);
+      
+      // exact matching with order maintained
+      Resource testCapability2 = Resource.newInstance(2000, 1);
+      matches = amClient.getMatchingRequests(priority, node, testCapability2);
+      verifyMatches(matches, 2);
+      // must be returned in the order they were made
+      int i = 0;
+      for(StoredContainerRequest storedRequest1 : matches.get(0)) {
+        if(i++ == 0) {
+          assertTrue(storedContainer4 == storedRequest1);
+        } else {
+          assertTrue(storedContainer6 == storedRequest1);
+        }
+      }
+      amClient.removeContainerRequest(storedContainer6);
+      
+      // matching with larger container. all requests returned
+      Resource testCapability3 = Resource.newInstance(4000, 4);
+      matches = amClient.getMatchingRequests(priority, node, testCapability3);
+      assert(matches.size() == 4);
+      
+      Resource testCapability4 = Resource.newInstance(1024, 2);
+      matches = amClient.getMatchingRequests(priority, node, testCapability4);
+      assert(matches.size() == 2);
+      // verify non-fitting containers are not returned and fitting ones are
+      for(Collection<StoredContainerRequest> testSet : matches) {
+        assertTrue(testSet.size() == 1);
+        StoredContainerRequest testRequest = testSet.iterator().next();
+        assertTrue(testRequest != storedContainer4);
+        assertTrue(testRequest != storedContainer5);
+        assert(testRequest == storedContainer2 || 
+                testRequest == storedContainer3);
+      }
+      
+      Resource testCapability5 = Resource.newInstance(512, 4);
+      matches = amClient.getMatchingRequests(priority, node, testCapability5);
+      assert(matches.size() == 0);
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+  
+  private void verifyMatches(
+                  List<? extends Collection<StoredContainerRequest>> matches,
+                  int matchSize) {
+    assertTrue(matches.size() == 1);
+    assertTrue(matches.get(0).size() == matchSize);    
+  }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Resource capability = Resource.newInstance(1024, 2);
+
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability, nodes, null, priority);
+      amClient.addContainerRequest(storedContainer1);
+
+      // verify matching with original node and inferred rack
+      List<? extends Collection<StoredContainerRequest>> matches;
+      StoredContainerRequest storedRequest;
+      // exact match node
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      // inferred match rack
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      
+      // inferred rack match no longer valid after request is removed
+      amClient.removeContainerRequest(storedContainer1);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      assertTrue(matches.isEmpty());
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+
+  @Test (timeout=60000)
+  public void testAMRMClientMatchStorage() throws YarnException, IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient =
+          (AMRMClientImpl<StoredContainerRequest>) AMRMClient
+            .<StoredContainerRequest> createAMRMClient(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Priority priority1 = Records.newRecord(Priority.class);
+      priority1.setPriority(2);
+      
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability, nodes, racks, priority);
+      StoredContainerRequest storedContainer2 = 
+          new StoredContainerRequest(capability, nodes, racks, priority);
+      StoredContainerRequest storedContainer3 = 
+          new StoredContainerRequest(capability, null, null, priority1);
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer2);
+      amClient.addContainerRequest(storedContainer3);
+      
+      // test addition and storage
+      int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+       .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+      assertTrue(containersRequestedAny == 2);
+      containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
+          .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+         assertTrue(containersRequestedAny == 1);
+      List<? extends Collection<StoredContainerRequest>> matches = 
+          amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 2);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 2);
+      matches = 
+          amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+      verifyMatches(matches, 2);
+      matches = amClient.getMatchingRequests(priority1, rack, capability);
+      assertTrue(matches.isEmpty());
+      matches = 
+          amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+      verifyMatches(matches, 1);
+      
+      // test removal
+      amClient.removeContainerRequest(storedContainer3);
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 2);
+      amClient.removeContainerRequest(storedContainer2);
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 1);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 1);
+      
+      // test matching of containers
+      StoredContainerRequest storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      amClient.removeContainerRequest(storedContainer1);
+      matches = 
+          amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+      assertTrue(matches.isEmpty());
+      matches = 
+          amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+      assertTrue(matches.isEmpty());
+      // 0 requests left. everything got cleaned up
+      assertTrue(amClient.remoteRequestsTable.isEmpty());
+      
+      // go through an exemplary allocation, matching and release cycle
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer3);
+      // RM should allocate container within 2 calls to allocate()
+      int allocatedContainerCount = 0;
+      int iterationsLeft = 2;
+      while (allocatedContainerCount < 2
+          && iterationsLeft-- > 0) {
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        assertTrue(amClient.ask.size() == 0);
+        assertTrue(amClient.release.size() == 0);
+        
+        assertTrue(nodeCount == amClient.getClusterNodeCount());
+        allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+        for(Container container : allocResponse.getAllocatedContainers()) {
+          ContainerRequest expectedRequest = 
+              container.getPriority().equals(storedContainer1.getPriority()) ?
+                  storedContainer1 : storedContainer3;
+          matches = amClient.getMatchingRequests(container.getPriority(), 
+                                                 ResourceRequest.ANY, 
+                                                 container.getResource());
+          // test correct matched container is returned
+          verifyMatches(matches, 1);
+          ContainerRequest matchedRequest = matches.get(0).iterator().next();
+          assertTrue(matchedRequest == expectedRequest);
+          
+          // assign this container, use it and release it
+          amClient.releaseAssignedContainer(container.getId());
+        }
+        if(allocatedContainerCount < containersRequestedAny) {
+          // sleep to let NM's heartbeat to RM and trigger allocations
+          sleep(1000);
+        }
+      }
+      
+      assertTrue(allocatedContainerCount == 2);
+      assertTrue(amClient.release.size() == 2);
+      assertTrue(amClient.ask.size() == 0);
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertTrue(amClient.release.size() == 0);
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+      
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+
+  @Test (timeout=60000)
+  public void testAMRMClient() throws YarnException, IOException {
+    AMRMClient<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId);
+      amClient.init(conf);
+      amClient.start();
+
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      testAllocation((AMRMClientImpl<ContainerRequest>)amClient);
+
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+    
+  private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)  
+      throws YarnException, IOException {
+    // setup container request
+    
+    assertTrue(amClient.ask.size() == 0);
+    assertTrue(amClient.release.size() == 0);
+    
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 1));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 3));
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
+    
+    int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
+        .get(node).get(capability).remoteRequest.getNumContainers();
+    int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
+        .get(rack).get(capability).remoteRequest.getNumContainers();
+    int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+    .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+
+    assertTrue(containersRequestedNode == 2);
+    assertTrue(containersRequestedRack == 2);
+    assertTrue(containersRequestedAny == 2);
+    assertTrue(amClient.ask.size() == 3);
+    assertTrue(amClient.release.size() == 0);
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 2;
+    Set<ContainerId> releases = new TreeSet<ContainerId>();
+    
+    ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
+    Assert.assertEquals(0, nmTokens.size());
+    HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
+    
+    while (allocatedContainerCount < containersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(amClient.release.size() == 0);
+      
+      assertTrue(nodeCount == amClient.getClusterNodeCount());
+      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+      for(Container container : allocResponse.getAllocatedContainers()) {
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+        amClient.releaseAssignedContainer(rejectContainerId);
+      }
+      Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
+      Iterator<String> nodeI = nmTokens.keySet().iterator();
+      while (nodeI.hasNext()) {
+        String nodeId = nodeI.next();
+        if (!receivedNMTokens.containsKey(nodeId)) {
+          receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
+        } else {
+          Assert.fail("Received token again for : " + nodeId);
+        }
+      }
+      nodeI = receivedNMTokens.keySet().iterator();
+      while (nodeI.hasNext()) {
+        nmTokens.remove(nodeI.next());
+      }
+      
+      if(allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(1000);
+      }
+    }
+    
+    Assert.assertEquals(0, amClient.getNMTokens().size());
+    // Should receive atleast 1 token
+    Assert.assertTrue(receivedNMTokens.size() > 0
+        && receivedNMTokens.size() <= nodeCount);
+    
+    assertTrue(allocatedContainerCount == containersRequestedAny);
+    assertTrue(amClient.release.size() == 2);
+    assertTrue(amClient.ask.size() == 0);
+    
+    // need to tell the AMRMClient that we dont need these resources anymore
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
+    assertTrue(amClient.ask.size() == 3);
+    // send 0 container count request for resources that are no longer needed
+    ResourceRequest snoopRequest = amClient.ask.iterator().next();
+    assertTrue(snoopRequest.getNumContainers() == 0);
+    
+    // test RPC exception handling
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    snoopRequest = amClient.ask.iterator().next();
+    assertTrue(snoopRequest.getNumContainers() == 2);
+    
+    ApplicationMasterProtocol realRM = amClient.rmClient;
+    try {
+      ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
+      when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+          new Answer<AllocateResponse>() {
+            public AllocateResponse answer(InvocationOnMock invocation)
+                throws Exception {
+              amClient.removeContainerRequest(
+                             new ContainerRequest(capability, nodes, 
+                                                          racks, priority, 2));
+              throw new Exception();
+            }
+          });
+      amClient.rmClient = mockRM;
+      amClient.allocate(0.1f);
+    }catch (Exception ioe) {}
+    finally {
+      amClient.rmClient = realRM;
+    }
+
+    assertTrue(amClient.release.size() == 2);
+    assertTrue(amClient.ask.size() == 3);
+    snoopRequest = amClient.ask.iterator().next();
+    // verify that the remove request made in between makeRequest and allocate 
+    // has not been lost
+    assertTrue(snoopRequest.getNumContainers() == 0);
+    
+    iterationsLeft = 2;
+    // do a few iterations to ensure RM is not going send new containers
+    while(!releases.isEmpty() || iterationsLeft-- > 0) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+      if(allocResponse.getCompletedContainersStatuses().size() > 0) {
+        for(ContainerStatus cStatus :allocResponse
+            .getCompletedContainersStatuses()) {
+          if(releases.contains(cStatus.getContainerId())) {
+            assertTrue(cStatus.getState() == ContainerState.COMPLETE);
+            assertTrue(cStatus.getExitStatus() == -100);
+            releases.remove(cStatus.getContainerId());
+          }
+        }
+      }
+      if(iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(1000);
+      }
+    }
+    assertTrue(amClient.ask.size() == 0);
+    assertTrue(amClient.release.size() == 0);
+  }
+  
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestAMRMClientContainerRequest {
+  @Test
+  public void testFillInRacks() {
+    AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+ 
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1), 4);
+    client.addContainerRequest(request);
+    verifyResourceRequestLocation(client, request, "host1");
+    verifyResourceRequestLocation(client, request, "host2");
+    verifyResourceRequestLocation(client, request, "/rack1");
+    verifyResourceRequestLocation(client, request, "/rack2");
+    verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+  }
+  
+  private static class MyResolver implements DNSToSwitchMapping {
+
+    @Override
+    public List<String> resolve(List<String> names) {
+      return Arrays.asList("/rack1");
+    }
+
+    @Override
+    public void reloadCachedMappings() {}
+  }
+  
+  private void verifyResourceRequestLocation(
+      AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+      String location) {
+    ResourceRequest ask =  client.remoteRequestsTable.get(request.getPriority())
+        .get(location).get(request.getCapability()).remoteRequest;
+    assertEquals(location, ask.getResourceName());
+    assertEquals(request.getContainerCount(), ask.getNumContainers());
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMClient {
+  Configuration conf = null;
+  MiniYARNCluster yarnCluster = null;
+  YarnClientImpl yarnClient = null;
+  AMRMClientImpl<ContainerRequest> rmClient = null;
+  NMClientImpl nmClient = null;
+  List<NodeReport> nodeReports = null;
+  ApplicationAttemptId attemptId = null;
+  int nodeCount = 3;
+
+  @Before
+  public void setup() throws YarnException, IOException {
+    // start minicluster
+    conf = new YarnConfiguration();
+    yarnCluster =
+        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+    assertNotNull(yarnCluster);
+    assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+
+    // start rm client
+    yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+    assertNotNull(yarnClient);
+    assertEquals(STATE.STARTED, yarnClient.getServiceState());
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports();
+
+    // submit new app
+    GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Priority.newInstance(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+    // unmanaged AM
+    appContext.setUnmanagedAM(true);
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    int iterationsLeft = 30;
+    while (iterationsLeft > 0) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() ==
+          YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        break;
+      }
+      sleep(1000);
+      --iterationsLeft;
+    }
+    if (iterationsLeft == 0) {
+      fail("Application hasn't bee started");
+    }
+
+    // start am rm client
+    rmClient =
+        (AMRMClientImpl<ContainerRequest>) AMRMClient
+          .<ContainerRequest> createAMRMClient(attemptId);
+    rmClient.init(conf);
+    rmClient.start();
+    assertNotNull(rmClient);
+    assertEquals(STATE.STARTED, rmClient.getServiceState());
+
+    // start am nm client
+    nmClient = (NMClientImpl) NMClient.createNMClient();
+    nmClient.init(conf);
+    nmClient.start();
+    assertNotNull(nmClient);
+    assertEquals(STATE.STARTED, nmClient.getServiceState());
+  }
+
+  @After
+  public void tearDown() {
+    rmClient.stop();
+    yarnClient.stop();
+    yarnCluster.stop();
+  }
+
+  private void stopNmClient(boolean stopContainers) {
+    assertNotNull("Null nmClient", nmClient);
+    // leave one unclosed
+    assertEquals(1, nmClient.startedContainers.size());
+    // default true
+    assertTrue(nmClient.getCleanupRunningContainers().get());
+    nmClient.cleanupRunningContainersOnStop(stopContainers);
+    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
+    nmClient.stop();
+  }
+
+  @Test (timeout = 60000)
+  public void testNMClientNoCleanupOnStop()
+      throws YarnException, IOException {
+
+    rmClient.registerApplicationMaster("Host", 10000, "");
+
+    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+                                         null, null);
+    // don't stop the running containers
+    stopNmClient(false);
+    assertFalse(nmClient.startedContainers. isEmpty());
+    //now cleanup
+    nmClient.cleanupRunningContainers();
+    assertEquals(0, nmClient.startedContainers.size());
+  }
+
+  @Test (timeout = 60000)
+  public void testNMClient()
+      throws YarnException, IOException {
+
+    rmClient.registerApplicationMaster("Host", 10000, "");
+
+    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+        null, null);
+    // stop the running containers on close
+    assertFalse(nmClient.startedContainers.isEmpty());
+    nmClient.cleanupRunningContainersOnStop(true);
+    assertTrue(nmClient.getCleanupRunningContainers().get());
+    nmClient.stop();
+  }
+
+  private Set<Container> allocateContainers(
+      AMRMClientImpl<ContainerRequest> rmClient, int num)
+      throws YarnException, IOException {
+    // setup container request
+    Resource capability = Resource.newInstance(1024, 0);
+    Priority priority = Priority.newInstance(0);
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String rack = nodeReports.get(0).getRackName();
+    String[] nodes = new String[] {node};
+    String[] racks = new String[] {rack};
+
+    for (int i = 0; i < num; ++i) {
+      rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
+          racks, priority, 1));
+    }
+
+    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
+        .get(ResourceRequest.ANY).get(capability).remoteRequest
+        .getNumContainers();
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 2;
+    Set<Container> containers = new TreeSet<Container>();
+    while (allocatedContainerCount < containersRequestedAny
+        && iterationsLeft > 0) {
+      AllocateResponse allocResponse = rmClient.allocate(0.1f);
+
+      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+      for(Container container : allocResponse.getAllocatedContainers()) {
+        containers.add(container);
+      }
+      if(allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(1000);
+      }
+
+      --iterationsLeft;
+    }
+    return containers;
+  }
+
+  private void testContainerManagement(NMClientImpl nmClient,
+      Set<Container> containers) throws YarnException, IOException {
+    int size = containers.size();
+    int i = 0;
+    for (Container container : containers) {
+      // getContainerStatus shouldn't be called before startContainer,
+      // otherwise, NodeManager cannot find the container
+      try {
+        nmClient.getContainerStatus(container.getId(), container.getNodeId(),
+            container.getContainerToken());
+        fail("Exception is expected");
+      } catch (YarnException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains("is not handled by this NodeManager"));
+      }
+
+      // stopContainer shouldn't be called before startContainer,
+      // otherwise, an exception will be thrown
+      try {
+        nmClient.stopContainer(container.getId(), container.getNodeId(),
+            container.getContainerToken());
+        fail("Exception is expected");
+      } catch (YarnException e) {
+        if (!e.getMessage()
+              .contains("is either not started yet or already stopped")) {
+          throw (AssertionError)
+            (new AssertionError("Exception is not expected: " + e).initCause(
+              e));
+        }
+      }
+
+      Credentials ts = new Credentials();
+      DataOutputBuffer dob = new DataOutputBuffer();
+      ts.writeTokenStorageToStream(dob);
+      ByteBuffer securityTokens =
+          ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      ContainerLaunchContext clc =
+          Records.newRecord(ContainerLaunchContext.class);
+      clc.setTokens(securityTokens);
+      try {
+        nmClient.startContainer(container, clc);
+      } catch (YarnException e) {
+        throw (AssertionError)
+          (new AssertionError("Exception is not expected: " + e).initCause(e));
+      }
+
+      // leave one container unclosed
+      if (++i < size) {
+        // NodeManager may still need some time to make the container started
+        testGetContainerStatus(container, i, ContainerState.RUNNING, "",
+            -1000);
+
+        try {
+          nmClient.stopContainer(container.getId(), container.getNodeId(),
+              container.getContainerToken());
+        } catch (YarnException e) {
+          throw (AssertionError)
+            (new AssertionError("Exception is not expected: " + e)
+               .initCause(e));
+        }
+
+        // getContainerStatus can be called after stopContainer
+        testGetContainerStatus(container, i, ContainerState.COMPLETE,
+            "Container killed by the ApplicationMaster.", 143);
+      }
+    }
+  }
+
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void testGetContainerStatus(Container container, int index,
+      ContainerState state, String diagnostics, int exitStatus)
+          throws YarnException, IOException {
+    while (true) {
+      try {
+        ContainerStatus status = nmClient.getContainerStatus(
+            container.getId(), container.getNodeId(),
+                container.getContainerToken());
+        // NodeManager may still need some time to get the stable
+        // container status
+        if (status.getState() == state) {
+          assertEquals(container.getId(), status.getContainerId());
+          assertTrue("" + index + ": " + status.getDiagnostics(),
+              status.getDiagnostics().contains(diagnostics));
+          assertEquals(exitStatus, status.getExitStatus());
+          break;
+        }
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class TestYarnClient {
+
+  @Test
+  public void test() {
+    // More to come later.
+  }
+
+  @Test
+  public void testClientStop() {
+    Configuration conf = new Configuration();
+    ResourceManager rm = new ResourceManager();
+    rm.init(conf);
+    rm.start();
+
+    YarnClient client = YarnClient.createYarnClient();
+    client.init(conf);
+    client.start();
+    client.stop();
+  }
+
+  @Test (timeout = 30000)
+  public void testSubmitApplication() {
+    Configuration conf = new Configuration();
+    conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+        100); // speed up tests
+    final YarnClient client = new MockYarnClient();
+    client.init(conf);
+    client.start();
+
+    YarnApplicationState[] exitStates = new YarnApplicationState[]
+        {
+          YarnApplicationState.SUBMITTED,
+          YarnApplicationState.ACCEPTED,
+          YarnApplicationState.RUNNING,
+          YarnApplicationState.FINISHED,
+          YarnApplicationState.FAILED,
+          YarnApplicationState.KILLED
+        };
+    for (int i = 0; i < exitStates.length; ++i) {
+      ApplicationSubmissionContext context =
+          mock(ApplicationSubmissionContext.class);
+      ApplicationId applicationId = ApplicationId.newInstance(
+          System.currentTimeMillis(), i);
+      when(context.getApplicationId()).thenReturn(applicationId);
+      ((MockYarnClient) client).setYarnApplicationState(exitStates[i]);
+      try {
+        client.submitApplication(context);
+      } catch (YarnException e) {
+        Assert.fail("Exception is not expected.");
+      } catch (IOException e) {
+        Assert.fail("Exception is not expected.");
+      }
+      verify(((MockYarnClient) client).mockReport,times(4 * i + 4))
+          .getYarnApplicationState();
+    }
+
+    client.stop();
+  }
+
+  @Test(timeout = 30000)
+  public void testApplicationType() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    RMApp app = rm.submitApp(2000);
+    RMApp app1 =
+        rm.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null, "MAPREDUCE");
+    Assert.assertEquals("YARN", app.getApplicationType());
+    Assert.assertEquals("MAPREDUCE", app1.getApplicationType());
+    rm.stop();
+  }
+
+  @Test(timeout = 30000)
+  public void testApplicationTypeLimit() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    RMApp app1 =
+        rm.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null, "MAPREDUCE-LENGTH-IS-20");
+    Assert.assertEquals("MAPREDUCE-LENGTH-IS-", app1.getApplicationType());
+    rm.stop();
+  }
+  
+  private static class MockYarnClient extends YarnClientImpl {
+    private ApplicationReport mockReport;
+
+    public MockYarnClient() {
+      super();
+    }
+
+    @Override
+    public void start() {
+      rmClient = mock(ApplicationClientProtocol.class);
+      GetApplicationReportResponse mockResponse =
+          mock(GetApplicationReportResponse.class);
+      mockReport = mock(ApplicationReport.class);
+      try{
+        when(rmClient.getApplicationReport(any(
+            GetApplicationReportRequest.class))).thenReturn(mockResponse);
+      } catch (YarnException e) {
+        Assert.fail("Exception is not expected.");
+      } catch (IOException e) {
+        Assert.fail("Exception is not expected.");
+      }
+      when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    public void setYarnApplicationState(YarnApplicationState state) {
+      when(mockReport.getYarnApplicationState()).thenReturn(
+          YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+          YarnApplicationState.NEW_SAVING, state);
+    }
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java Tue Jun 18 04:02:47 2013
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Before;
 import org.junit.Test;