You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/11 19:20:31 UTC

[10/42] hadoop git commit: YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
new file mode 100644
index 0000000..964379a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the AMRMProxyService test cases. It provides utility
+ * methods that can be used by the concrete test case classes
+ *
+ */
+public abstract class BaseAMRMProxyTest {
+  private static final Log LOG = LogFactory
+      .getLog(BaseAMRMProxyTest.class);
+  /**
+   * The AMRMProxyService instance that will be used by all the test cases
+   */
+  private MockAMRMProxyService amrmProxyService;
+  /**
+   * Thread pool used for asynchronous operations
+   */
+  private static ExecutorService threadpool = Executors
+      .newCachedThreadPool();
+  private Configuration conf;
+  private AsyncDispatcher dispatcher;
+
+  protected MockAMRMProxyService getAMRMProxyService() {
+    Assert.assertNotNull(this.amrmProxyService);
+    return this.amrmProxyService;
+  }
+
+  @Before
+  public void setUp() {
+    this.conf = new YarnConfiguration();
+    this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    String mockPassThroughInterceptorClass =
+        PassThroughRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain will call the mock resource manager. The others in the chain will
+    // simply forward it to the next one in the chain
+    this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + mockPassThroughInterceptorClass + ","
+            + mockPassThroughInterceptorClass + ","
+            + MockRequestInterceptor.class.getName());
+
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(conf);
+    this.dispatcher.start();
+    this.amrmProxyService = createAndStartAMRMProxyService();
+  }
+
+  @After
+  public void tearDown() {
+    amrmProxyService.stop();
+    amrmProxyService = null;
+    this.dispatcher.stop();
+  }
+
+  protected ExecutorService getThreadPool() {
+    return threadpool;
+  }
+
+  protected MockAMRMProxyService createAndStartAMRMProxyService() {
+    MockAMRMProxyService svc =
+        new MockAMRMProxyService(new NullContext(), dispatcher);
+    svc.init(conf);
+    svc.start();
+    return svc;
+  }
+
+  /**
+   * This helper method will invoke the specified function in parallel for each
+   * end point in the specified list using a thread pool and return the
+   * responses received from the function. It implements the logic required for
+   * dispatching requests in parallel and waiting for the responses. If any of
+   * the function call fails or times out, it will ignore and proceed with the
+   * rest. So the responses returned can be less than the number of end points
+   * specified
+   * 
+   * @param testContext
+   * @param func
+   * @return
+   */
+  protected <T, R> List<R> runInParallel(List<T> testContexts,
+      final Function<T, R> func) {
+    ExecutorCompletionService<R> completionService =
+        new ExecutorCompletionService<R>(this.getThreadPool());
+    LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
+        + testContexts.size());
+    for (int index = 0; index < testContexts.size(); index++) {
+      final T testContext = testContexts.get(index);
+
+      LOG.info("Adding request to threadpool for test context: "
+          + testContext.toString());
+
+      completionService.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          LOG.info("Sending request. Test context:"
+              + testContext.toString());
+
+          R response = null;
+          try {
+            response = func.invoke(testContext);
+            LOG.info("Successfully sent request for context: "
+                + testContext.toString());
+          } catch (Throwable ex) {
+            LOG.error("Failed to process request for context: "
+                + testContext);
+            response = null;
+          }
+
+          return response;
+        }
+      });
+    }
+
+    ArrayList<R> responseList = new ArrayList<R>();
+    LOG.info("Waiting for responses from endpoints. Number of contexts="
+        + testContexts.size());
+    for (int i = 0; i < testContexts.size(); ++i) {
+      try {
+        final Future<R> future = completionService.take();
+        final R response = future.get(3000, TimeUnit.MILLISECONDS);
+        responseList.add(response);
+      } catch (Throwable e) {
+        LOG.error("Failed to process request " + e.getMessage());
+      }
+    }
+
+    return responseList;
+  }
+
+  /**
+   * Helper method to register an application master using specified testAppId
+   * as the application identifier and return the response
+   * 
+   * @param testAppId
+   * @return
+   * @throws Exception
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected RegisterApplicationMasterResponse registerApplicationMaster(
+      final int testAppId) throws Exception, YarnException, IOException {
+    final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+    return ugi
+        .getUser()
+        .doAs(
+            new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
+              @Override
+              public RegisterApplicationMasterResponse run()
+                  throws Exception {
+                getAMRMProxyService().initApp(
+                    ugi.getAppAttemptId(),
+                    ugi.getUser().getUserName());
+
+                final RegisterApplicationMasterRequest req =
+                    Records
+                        .newRecord(RegisterApplicationMasterRequest.class);
+                req.setHost(Integer.toString(testAppId));
+                req.setRpcPort(testAppId);
+                req.setTrackingUrl("");
+
+                RegisterApplicationMasterResponse response =
+                    getAMRMProxyService().registerApplicationMaster(req);
+                return response;
+              }
+            });
+  }
+
+  /**
+   * Helper method that can be used to register multiple application masters in
+   * parallel to the specified RM end points
+   * 
+   * @param testContexts - used to identify the requests
+   * @return
+   */
+  protected <T> List<RegisterApplicationMasterResponseInfo<T>> registerApplicationMastersInParallel(
+      final ArrayList<T> testContexts) {
+    List<RegisterApplicationMasterResponseInfo<T>> responses =
+        runInParallel(testContexts,
+            new Function<T, RegisterApplicationMasterResponseInfo<T>>() {
+              @Override
+              public RegisterApplicationMasterResponseInfo<T> invoke(
+                  T testContext) {
+                RegisterApplicationMasterResponseInfo<T> response = null;
+                try {
+                  int index = testContexts.indexOf(testContext);
+                  response =
+                      new RegisterApplicationMasterResponseInfo<T>(
+                          registerApplicationMaster(index), testContext);
+                  Assert.assertNotNull(response.getResponse());
+                  Assert.assertEquals(Integer.toString(index), response
+                      .getResponse().getQueue());
+
+                  LOG.info("Sucessfully registered application master with test context: "
+                      + testContext);
+                } catch (Throwable ex) {
+                  response = null;
+                  LOG.error("Failed to register application master with test context: "
+                      + testContext);
+                }
+
+                return response;
+              }
+            });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        testContexts.size(), responses.size());
+
+    Set<T> contextResponses = new TreeSet<T>();
+    for (RegisterApplicationMasterResponseInfo<T> item : responses) {
+      contextResponses.add(item.getTestContext());
+    }
+
+    for (T ep : testContexts) {
+      Assert.assertTrue(contextResponses.contains(ep));
+    }
+
+    return responses;
+  }
+
+  /**
+   * Unregisters the application master for specified application id
+   * 
+   * @param appId
+   * @param status
+   * @return
+   * @throws Exception
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected FinishApplicationMasterResponse finishApplicationMaster(
+      final int appId, final FinalApplicationStatus status)
+      throws Exception, YarnException, IOException {
+
+    final ApplicationUserInfo ugi = getApplicationUserInfo(appId);
+
+    return ugi.getUser().doAs(
+        new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
+          @Override
+          public FinishApplicationMasterResponse run() throws Exception {
+            final FinishApplicationMasterRequest req =
+                Records.newRecord(FinishApplicationMasterRequest.class);
+            req.setDiagnostics("");
+            req.setTrackingUrl("");
+            req.setFinalApplicationStatus(status);
+
+            FinishApplicationMasterResponse response =
+                getAMRMProxyService().finishApplicationMaster(req);
+
+            getAMRMProxyService().stopApp(
+                ugi.getAppAttemptId().getApplicationId());
+
+            return response;
+          }
+        });
+  }
+
+  protected <T> List<FinishApplicationMasterResponseInfo<T>> finishApplicationMastersInParallel(
+      final ArrayList<T> testContexts) {
+    List<FinishApplicationMasterResponseInfo<T>> responses =
+        runInParallel(testContexts,
+            new Function<T, FinishApplicationMasterResponseInfo<T>>() {
+              @Override
+              public FinishApplicationMasterResponseInfo<T> invoke(
+                  T testContext) {
+                FinishApplicationMasterResponseInfo<T> response = null;
+                try {
+                  response =
+                      new FinishApplicationMasterResponseInfo<T>(
+                          finishApplicationMaster(
+                              testContexts.indexOf(testContext),
+                              FinalApplicationStatus.SUCCEEDED),
+                          testContext);
+                  Assert.assertNotNull(response.getResponse());
+
+                  LOG.info("Sucessfully finished application master with test contexts: "
+                      + testContext);
+                } catch (Throwable ex) {
+                  response = null;
+                  LOG.error("Failed to finish application master with test context: "
+                      + testContext);
+                }
+
+                return response;
+              }
+            });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        testContexts.size(), responses.size());
+
+    Set<T> contextResponses = new TreeSet<T>();
+    for (FinishApplicationMasterResponseInfo<T> item : responses) {
+      Assert.assertNotNull(item);
+      Assert.assertNotNull(item.getResponse());
+      contextResponses.add(item.getTestContext());
+    }
+
+    for (T ep : testContexts) {
+      Assert.assertTrue(contextResponses.contains(ep));
+    }
+
+    return responses;
+  }
+
+  protected AllocateResponse allocate(final int testAppId)
+      throws Exception, YarnException, IOException {
+    final AllocateRequest req = Records.newRecord(AllocateRequest.class);
+    req.setResponseId(testAppId);
+    return allocate(testAppId, req);
+  }
+
+  protected AllocateResponse allocate(final int testAppId,
+      final AllocateRequest request) throws Exception, YarnException,
+      IOException {
+
+    final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+    return ugi.getUser().doAs(
+        new PrivilegedExceptionAction<AllocateResponse>() {
+          @Override
+          public AllocateResponse run() throws Exception {
+            AllocateResponse response =
+                getAMRMProxyService().allocate(request);
+            return response;
+          }
+        });
+  }
+
+  protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
+    final ApplicationAttemptId attemptId =
+        getApplicationAttemptId(testAppId);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(attemptId.toString());
+    AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
+    ugi.addTokenIdentifier(token);
+    return new ApplicationUserInfo(ugi, attemptId);
+  }
+
+  protected List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers)
+      throws Exception {
+    return createResourceRequests(hosts, memory, vCores, priority,
+        containers, null);
+  }
+
+  protected List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression) throws Exception {
+    List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
+    for (String host : hosts) {
+      ResourceRequest hostReq =
+          createResourceRequest(host, memory, vCores, priority,
+              containers, labelExpression);
+      reqs.add(hostReq);
+      ResourceRequest rackReq =
+          createResourceRequest("/default-rack", memory, vCores, priority,
+              containers, labelExpression);
+      reqs.add(rackReq);
+    }
+
+    ResourceRequest offRackReq =
+        createResourceRequest(ResourceRequest.ANY, memory, vCores,
+            priority, containers, labelExpression);
+    reqs.add(offRackReq);
+    return reqs;
+  }
+
+  protected ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers)
+      throws Exception {
+    return createResourceRequest(resource, memory, vCores, priority,
+        containers, null);
+  }
+
+  protected ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression) throws Exception {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setResourceName(resource);
+    req.setNumContainers(containers);
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(priority);
+    req.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(memory);
+    capability.setVirtualCores(vCores);
+    req.setCapability(capability);
+    if (labelExpression != null) {
+      req.setNodeLabelExpression(labelExpression);
+    }
+    return req;
+  }
+
+  /**
+   * Returns an ApplicationId with the specified identifier
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationId getApplicationId(int testAppId) {
+    return ApplicationId.newInstance(123456, testAppId);
+  }
+
+  /**
+   * Return an instance of ApplicationAttemptId using specified identifier. This
+   * identifier will be used for the ApplicationId too.
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
+    return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
+        testAppId);
+  }
+
+  /**
+   * Return an instance of ApplicationAttemptId using specified identifier and
+   * application id
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
+      ApplicationId appId) {
+    return ApplicationAttemptId.newInstance(appId, testAppId);
+  }
+
+  protected static class RegisterApplicationMasterResponseInfo<T> {
+    private RegisterApplicationMasterResponse response;
+    private T testContext;
+
+    RegisterApplicationMasterResponseInfo(
+        RegisterApplicationMasterResponse response, T testContext) {
+      this.response = response;
+      this.testContext = testContext;
+    }
+
+    public RegisterApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public T getTestContext() {
+      return testContext;
+    }
+  }
+
+  protected static class FinishApplicationMasterResponseInfo<T> {
+    private FinishApplicationMasterResponse response;
+    private T testContext;
+
+    FinishApplicationMasterResponseInfo(
+        FinishApplicationMasterResponse response, T testContext) {
+      this.response = response;
+      this.testContext = testContext;
+    }
+
+    public FinishApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public T getTestContext() {
+      return testContext;
+    }
+  }
+
+  protected static class ApplicationUserInfo {
+    private UserGroupInformation user;
+    private ApplicationAttemptId attemptId;
+
+    ApplicationUserInfo(UserGroupInformation user,
+        ApplicationAttemptId attemptId) {
+      this.user = user;
+      this.attemptId = attemptId;
+    }
+
+    public UserGroupInformation getUser() {
+      return this.user;
+    }
+
+    public ApplicationAttemptId getAppAttemptId() {
+      return this.attemptId;
+    }
+  }
+
+  protected static class MockAMRMProxyService extends AMRMProxyService {
+    public MockAMRMProxyService(Context nmContext,
+        AsyncDispatcher dispatcher) {
+      super(nmContext, dispatcher);
+    }
+
+    /**
+     * This method is used by the test code to initialize the pipeline. In the
+     * actual service, the initialization is called by the
+     * ContainerManagerImpl::StartContainers method
+     * 
+     * @param applicationId
+     * @param user
+     */
+    public void initApp(ApplicationAttemptId applicationId, String user) {
+      super.initializePipeline(applicationId, user, null, null);
+    }
+
+    public void stopApp(ApplicationId applicationId) {
+      super.stopApplication(applicationId);
+    }
+  }
+
+  /**
+   * The Function interface is used for passing method pointers that can be
+   * invoked asynchronously at a later point.
+   */
+  protected interface Function<T, R> {
+    public R invoke(T input);
+  }
+
+  protected class NullContext implements Context {
+
+    @Override
+    public NodeId getNodeId() {
+      return null;
+    }
+
+    @Override
+    public int getHttpPort() {
+      return 0;
+    }
+
+    @Override
+    public ConcurrentMap<ApplicationId, Application> getApplications() {
+      return null;
+    }
+
+    @Override
+    public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
+      return null;
+    }
+
+    @Override
+    public ConcurrentMap<ContainerId, Container> getContainers() {
+      return null;
+    }
+
+    @Override
+    public NMContainerTokenSecretManager getContainerTokenSecretManager() {
+      return null;
+    }
+
+    @Override
+    public NMTokenSecretManagerInNM getNMTokenSecretManager() {
+      return null;
+    }
+
+    @Override
+    public NodeHealthStatus getNodeHealthStatus() {
+      return null;
+    }
+
+    @Override
+    public ContainerManagementProtocol getContainerManager() {
+      return null;
+    }
+
+    @Override
+    public LocalDirsHandlerService getLocalDirsHandler() {
+      return null;
+    }
+
+    @Override
+    public ApplicationACLsManager getApplicationACLsManager() {
+      return null;
+    }
+
+    @Override
+    public NMStateStoreService getNMStateStore() {
+      return null;
+    }
+
+    @Override
+    public boolean getDecommissioned() {
+      return false;
+    }
+
+    @Override
+    public void setDecommissioned(boolean isDecommissioned) {
+    }
+
+    @Override
+    public ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps() {
+      return null;
+    }
+
+    @Override
+    public NodeResourceMonitor getNodeResourceMonitor() {
+      return null;
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
new file mode 100644
index 0000000..c962f97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class MockRequestInterceptor extends AbstractRequestInterceptor {
+
+  private MockResourceManagerFacade mockRM;
+
+  public MockRequestInterceptor() {
+  }
+
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    mockRM =
+        new MockResourceManagerFacade(new YarnConfiguration(
+            super.getConf()), 0);
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return mockRM.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return mockRM.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    return mockRM.allocate(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
new file mode 100644
index 0000000..7573a7a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.directory.api.util.exception.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.mortbay.log.Log;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the unit test cases. So please change the
+ * implementation with care.
+ */
+public class MockResourceManagerFacade implements
+    ApplicationMasterProtocol, ApplicationClientProtocol {
+
+  private HashMap<String, List<ContainerId>> applicationContainerIdMap =
+      new HashMap<String, List<ContainerId>>();
+  private HashMap<ContainerId, Container> allocatedContainerMap =
+      new HashMap<ContainerId, Container>();
+  private AtomicInteger containerIndex = new AtomicInteger(0);
+  private Configuration conf;
+
+  public MockResourceManagerFacade(Configuration conf,
+      int startContainerIndex) {
+    this.conf = conf;
+    this.containerIndex.set(startContainerIndex);
+  }
+
+  private static String getAppIdentifier() throws IOException {
+    AMRMTokenIdentifier result = null;
+    UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
+    }
+    return result != null ? result.getApplicationAttemptId().toString()
+        : "";
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    String amrmToken = getAppIdentifier();
+    Log.info("Registering application attempt: " + amrmToken);
+
+    synchronized (applicationContainerIdMap) {
+      Assert.assertFalse("The application id is already registered: "
+          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      // Keep track of the containers that are returned to this application
+      applicationContainerIdMap.put(amrmToken,
+          new ArrayList<ContainerId>());
+    }
+
+    return RegisterApplicationMasterResponse.newInstance(null, null, null,
+        null, null, request.getHost(), null);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    String amrmToken = getAppIdentifier();
+    Log.info("Finishing application attempt: " + amrmToken);
+
+    synchronized (applicationContainerIdMap) {
+      // Remove the containers that were being tracked for this application
+      Assert.assertTrue("The application id is NOT registered: "
+          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
+      for (ContainerId c : ids) {
+        allocatedContainerMap.remove(c);
+      }
+    }
+
+    return FinishApplicationMasterResponse
+        .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
+            : false);
+  }
+
+  protected ApplicationId getApplicationId(int id) {
+    return ApplicationId.newInstance(12345, id);
+  }
+
+  protected ApplicationAttemptId getApplicationAttemptId(int id) {
+    return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    if (request.getAskList() != null && request.getAskList().size() > 0
+        && request.getReleaseList() != null
+        && request.getReleaseList().size() > 0) {
+      Assert.fail("The mock RM implementation does not support receiving "
+          + "askList and releaseList in the same heartbeat");
+    }
+
+    String amrmToken = getAppIdentifier();
+
+    ArrayList<Container> containerList = new ArrayList<Container>();
+    if (request.getAskList() != null) {
+      for (ResourceRequest rr : request.getAskList()) {
+        for (int i = 0; i < rr.getNumContainers(); i++) {
+          ContainerId containerId =
+              ContainerId.newInstance(getApplicationAttemptId(1),
+                  containerIndex.incrementAndGet());
+          Container container = Records.newRecord(Container.class);
+          container.setId(containerId);
+          container.setPriority(rr.getPriority());
+
+          // We don't use the node for running containers in the test cases. So
+          // it is OK to hard code it to some dummy value
+          NodeId nodeId =
+              NodeId.newInstance(
+                  !Strings.isEmpty(rr.getResourceName()) ? rr
+                      .getResourceName() : "dummy", 1000);
+          container.setNodeId(nodeId);
+          container.setResource(rr.getCapability());
+          containerList.add(container);
+
+          synchronized (applicationContainerIdMap) {
+            // Keep track of the containers returned to this application. We
+            // will need it in future
+            Assert.assertTrue(
+                "The application id is Not registered before allocate(): "
+                    + amrmToken,
+                applicationContainerIdMap.containsKey(amrmToken));
+            List<ContainerId> ids =
+                applicationContainerIdMap.get(amrmToken);
+            ids.add(containerId);
+            this.allocatedContainerMap.put(containerId, container);
+          }
+        }
+      }
+    }
+
+    if (request.getReleaseList() != null
+        && request.getReleaseList().size() > 0) {
+      Log.info("Releasing containers: " + request.getReleaseList().size());
+      synchronized (applicationContainerIdMap) {
+        Assert.assertTrue(
+            "The application id is not registered before allocate(): "
+                + amrmToken,
+            applicationContainerIdMap.containsKey(amrmToken));
+        List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+
+        for (ContainerId id : request.getReleaseList()) {
+          boolean found = false;
+          for (ContainerId c : ids) {
+            if (c.equals(id)) {
+              found = true;
+              break;
+            }
+          }
+
+          Assert.assertTrue(
+              "ContainerId " + id
+                  + " being released is not valid for application: "
+                  + conf.get("AMRMTOKEN"), found);
+
+          ids.remove(id);
+
+          // Return the released container back to the AM with new fake Ids. The
+          // test case does not care about the IDs. The IDs are faked because
+          // otherwise the LRM will throw duplication identifier exception. This
+          // returning of fake containers is ONLY done for testing purpose - for
+          // the test code to get confirmation that the sub-cluster resource
+          // managers received the release request
+          ContainerId fakeContainerId =
+              ContainerId.newInstance(getApplicationAttemptId(1),
+                  containerIndex.incrementAndGet());
+          Container fakeContainer = allocatedContainerMap.get(id);
+          fakeContainer.setId(fakeContainerId);
+          containerList.add(fakeContainer);
+        }
+      }
+    }
+
+    Log.info("Allocating containers: " + containerList.size()
+        + " for application attempt: " + conf.get("AMRMTOKEN"));
+    return AllocateResponse.newInstance(0,
+        new ArrayList<ContainerStatus>(), containerList,
+        new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
+        new ArrayList<NMToken>(),
+        new ArrayList<ContainerResourceIncrease>(),
+        new ArrayList<ContainerResourceDecrease>());
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException,
+      IOException {
+
+    GetApplicationReportResponse response =
+        Records.newRecord(GetApplicationReportResponse.class);
+    ApplicationReport report = Records.newRecord(ApplicationReport.class);
+    report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+    report.setApplicationId(request.getApplicationId());
+    report.setCurrentApplicationAttemptId(ApplicationAttemptId
+        .newInstance(request.getApplicationId(), 1));
+    response.setApplicationReport(report);
+    return response;
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request) throws YarnException,
+      IOException {
+    GetApplicationAttemptReportResponse response =
+        Records.newRecord(GetApplicationAttemptReportResponse.class);
+    ApplicationAttemptReport report =
+        Records.newRecord(ApplicationAttemptReport.class);
+    report.setApplicationAttemptId(request.getApplicationAttemptId());
+    report
+        .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+    response.setApplicationAttemptReport(report);
+    return response;
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(
+      GetApplicationsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(
+      GetClusterNodesRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request) throws YarnException,
+      IOException {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
new file mode 100644
index 0000000..97a844e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain
+ *
+ */
+public class PassThroughRequestInterceptor extends
+    AbstractRequestInterceptor {
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().allocate(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
new file mode 100644
index 0000000..69b913a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAMRMProxyService extends BaseAMRMProxyTest {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestAMRMProxyService.class);
+
+  /**
+   * Test if the pipeline is created properly.
+   */
+  @Test
+  public void testRequestInterceptorChainCreation() throws Exception {
+    RequestInterceptor root =
+        super.getAMRMProxyService().createRequestInterceptorChain();
+    int index = 0;
+    while (root != null) {
+      switch (index) {
+      case 0:
+      case 1:
+      case 2:
+        Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+            root.getClass().getName());
+        break;
+      case 3:
+        Assert.assertEquals(MockRequestInterceptor.class.getName(), root
+            .getClass().getName());
+        break;
+      }
+
+      root = root.getNextInterceptor();
+      index++;
+    }
+
+    Assert.assertEquals(
+        "The number of interceptors in chain does not match",
+        Integer.toString(4), Integer.toString(index));
+
+  }
+
+  /**
+   * Tests registration of a single application master.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterOneApplicationMaster() throws Exception {
+    // The testAppId identifier is used as host name and the mock resource
+    // manager return it as the queue name. Assert that we received the queue
+    // name
+    int testAppId = 1;
+    RegisterApplicationMasterResponse response1 =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(response1);
+    Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
+  }
+
+  /**
+   * Tests the registration of multiple application master serially one at a
+   * time.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterMulitpleApplicationMasters() throws Exception {
+    for (int testAppId = 0; testAppId < 3; testAppId++) {
+      RegisterApplicationMasterResponse response =
+          registerApplicationMaster(testAppId);
+      Assert.assertNotNull(response);
+      Assert
+          .assertEquals(Integer.toString(testAppId), response.getQueue());
+    }
+  }
+
+  /**
+   * Tests the registration of multiple application masters using multiple
+   * threads in parallel.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterMulitpleApplicationMastersInParallel()
+      throws Exception {
+    int numberOfRequests = 5;
+    ArrayList<String> testContexts =
+        CreateTestRequestIdentifiers(numberOfRequests);
+    super.registerApplicationMastersInParallel(testContexts);
+  }
+
+  private ArrayList<String> CreateTestRequestIdentifiers(
+      int numberOfRequests) {
+    ArrayList<String> testContexts = new ArrayList<String>();
+    LOG.info("Creating " + numberOfRequests + " contexts for testing");
+    for (int ep = 0; ep < numberOfRequests; ep++) {
+      testContexts.add("test-endpoint-" + Integer.toString(ep));
+      LOG.info("Created test context: " + testContexts.get(ep));
+    }
+    return testContexts;
+  }
+
+  @Test
+  public void testFinishOneApplicationMasterWithSuccess() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId,
+            FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  @Test
+  public void testFinishOneApplicationMasterWithFailure() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(false, finshResponse.getIsUnregistered());
+
+    try {
+      // Try to finish an application master that is already finished.
+      finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishInvalidApplicationMaster() throws Exception {
+    try {
+      // Try to finish an application master that was not registered.
+      finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishMulitpleApplicationMasters() throws Exception {
+    int numberOfRequests = 3;
+    for (int index = 0; index < numberOfRequests; index++) {
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(index);
+      Assert.assertNotNull(registerResponse);
+      Assert.assertEquals(Integer.toString(index),
+          registerResponse.getQueue());
+    }
+
+    // Finish in reverse sequence
+    for (int index = numberOfRequests - 1; index >= 0; index--) {
+      FinishApplicationMasterResponse finshResponse =
+          finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
+
+      Assert.assertNotNull(finshResponse);
+      Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+      // Assert that the application has been removed from the collection
+      Assert.assertTrue(this.getAMRMProxyService()
+          .getPipelines().size() == index);
+    }
+
+    try {
+      // Try to finish an application master that is already finished.
+      finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+
+    try {
+      // Try to finish an application master that was not registered.
+      finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishMulitpleApplicationMastersInParallel()
+      throws Exception {
+    int numberOfRequests = 5;
+    ArrayList<String> testContexts = new ArrayList<String>();
+    LOG.info("Creating " + numberOfRequests + " contexts for testing");
+    for (int i = 0; i < numberOfRequests; i++) {
+      testContexts.add("test-endpoint-" + Integer.toString(i));
+      LOG.info("Created test context: " + testContexts.get(i));
+
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(i);
+      Assert.assertNotNull(registerResponse);
+      Assert
+          .assertEquals(Integer.toString(i), registerResponse.getQueue());
+    }
+
+    finishApplicationMastersInParallel(testContexts);
+  }
+
+  @Test
+  public void testAllocateRequestWithNullValues() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    AllocateResponse allocateResponse = allocate(testAppId);
+    Assert.assertNotNull(allocateResponse);
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId,
+            FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  @Test
+  public void testAllocateRequestWithoutRegistering() throws Exception {
+
+    try {
+      // Try to allocate an application master without registering.
+      allocate(1);
+      Assert
+          .fail("The request to allocate application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("AllocateRequest failed as expected because AM was not registered");
+    }
+  }
+
+  @Test
+  public void testAllocateWithOneResourceRequest() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    getContainersAndAssert(testAppId, 1);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateWithMultipleResourceRequest() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    getContainersAndAssert(testAppId, 10);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainers() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    List<Container> containers = getContainersAndAssert(testAppId, 10);
+    releaseContainersAndAssert(testAppId, containers);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainersForMultipleAM()
+      throws Exception {
+    int numberOfApps = 5;
+    for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(testAppId);
+      Assert.assertNotNull(registerResponse);
+      List<Container> containers = getContainersAndAssert(testAppId, 10);
+      releaseContainersAndAssert(testAppId, containers);
+    }
+    for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+      finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+    }
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainersForMultipleAMInParallel()
+      throws Exception {
+    int numberOfApps = 6;
+    ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
+    for (int i = 0; i < numberOfApps; i++) {
+      tempAppIds.add(new Integer(i));
+    }
+
+    final ArrayList<Integer> appIds = tempAppIds;
+    List<Integer> responses =
+        runInParallel(appIds, new Function<Integer, Integer>() {
+          @Override
+          public Integer invoke(Integer testAppId) {
+            try {
+              RegisterApplicationMasterResponse registerResponse =
+                  registerApplicationMaster(testAppId);
+              Assert.assertNotNull("response is null", registerResponse);
+              List<Container> containers =
+                  getContainersAndAssert(testAppId, 10);
+              releaseContainersAndAssert(testAppId, containers);
+
+              LOG.info("Sucessfully registered application master with appId: "
+                  + testAppId);
+            } catch (Throwable ex) {
+              LOG.error(
+                  "Failed to register application master with appId: "
+                      + testAppId, ex);
+              testAppId = null;
+            }
+
+            return testAppId;
+          }
+        });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        appIds.size(), responses.size());
+
+    for (Integer testAppId : responses) {
+      Assert.assertNotNull(testAppId);
+      finishApplicationMaster(testAppId.intValue(),
+          FinalApplicationStatus.SUCCEEDED);
+    }
+  }
+
+  private List<Container> getContainersAndAssert(int appId,
+      int numberOfResourceRequests) throws Exception {
+    AllocateRequest allocateRequest =
+        Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<Container> containers =
+        new ArrayList<Container>(numberOfResourceRequests);
+    List<ResourceRequest> askList =
+        new ArrayList<ResourceRequest>(numberOfResourceRequests);
+    for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
+      askList.add(createResourceRequest(
+          "test-node-" + Integer.toString(testAppId), 6000, 2,
+          testAppId % 5, 1));
+    }
+
+    allocateRequest.setAskList(askList);
+
+    AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+    Assert.assertNotNull("allocate() returned null response",
+        allocateResponse);
+
+    containers.addAll(allocateResponse.getAllocatedContainers());
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containers.size() < askList.size() && numHeartbeat++ < 10) {
+      allocateResponse =
+          allocate(appId, Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull("allocate() returned null response",
+          allocateResponse);
+
+      containers.addAll(allocateResponse.getAllocatedContainers());
+
+      LOG.info("Number of allocated containers in this request: "
+          + Integer.toString(allocateResponse.getAllocatedContainers()
+              .size()));
+      LOG.info("Total number of allocated containers: "
+          + Integer.toString(containers.size()));
+      Thread.sleep(10);
+    }
+
+    // We broadcast the request, the number of containers we received will be
+    // higher than we ask
+    Assert.assertTrue("The asklist count is not same as response",
+        askList.size() <= containers.size());
+    return containers;
+  }
+
+  private void releaseContainersAndAssert(int appId,
+      List<Container> containers) throws Exception {
+    Assert.assertTrue(containers.size() > 0);
+    AllocateRequest allocateRequest =
+        Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<ContainerId> relList =
+        new ArrayList<ContainerId>(containers.size());
+    for (Container container : containers) {
+      relList.add(container.getId());
+    }
+
+    allocateRequest.setReleaseList(relList);
+
+    AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+    Assert.assertNotNull(allocateResponse);
+
+    // The way the mock resource manager is setup, it will return the containers
+    // that were released in the response. This is done because the UAMs run
+    // asynchronously and we need to if all the resource managers received the
+    // release it. The containers sent by the mock resource managers will be
+    // aggregated and returned back to us and we can assert if all the release
+    // lists reached the sub-clusters
+    List<Container> containersForReleasedContainerIds =
+        new ArrayList<Container>();
+    containersForReleasedContainerIds.addAll(allocateResponse
+        .getAllocatedContainers());
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containersForReleasedContainerIds.size() < relList.size()
+        && numHeartbeat++ < 10) {
+      allocateResponse =
+          allocate(appId, Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull(allocateResponse);
+      containersForReleasedContainerIds.addAll(allocateResponse
+          .getAllocatedContainers());
+
+      LOG.info("Number of containers received in this request: "
+          + Integer.toString(allocateResponse.getAllocatedContainers()
+              .size()));
+      LOG.info("Total number of containers received: "
+          + Integer.toString(containersForReleasedContainerIds.size()));
+      Thread.sleep(10);
+    }
+
+    Assert.assertEquals(relList.size(),
+        containersForReleasedContainerIds.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index c8b985d..14142de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -40,9 +40,7 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -175,69 +173,13 @@ public class ApplicationMasterService extends AbstractService implements
     return this.masterServiceAddress;
   }
 
-  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
-  // currently sets only the required id, but iterate through anyways just to be
-  // sure.
-  private AMRMTokenIdentifier selectAMRMTokenIdentifier(
-      UserGroupInformation remoteUgi) throws IOException {
-    AMRMTokenIdentifier result = null;
-    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
-    for (TokenIdentifier tokenId : tokenIds) {
-      if (tokenId instanceof AMRMTokenIdentifier) {
-        result = (AMRMTokenIdentifier) tokenId;
-        break;
-      }
-    }
-
-    return result;
-  }
-
-  private AMRMTokenIdentifier authorizeRequest()
-      throws YarnException {
-
-    UserGroupInformation remoteUgi;
-    try {
-      remoteUgi = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      String msg =
-          "Cannot obtain the user-name for authorizing ApplicationMaster. "
-              + "Got exception: " + StringUtils.stringifyException(e);
-      LOG.warn(msg);
-      throw RPCUtil.getRemoteException(msg);
-    }
-
-    boolean tokenFound = false;
-    String message = "";
-    AMRMTokenIdentifier appTokenIdentifier = null;
-    try {
-      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
-      if (appTokenIdentifier == null) {
-        tokenFound = false;
-        message = "No AMRMToken found for user " + remoteUgi.getUserName();
-      } else {
-        tokenFound = true;
-      }
-    } catch (IOException e) {
-      tokenFound = false;
-      message =
-          "Got exception while looking for AMRMToken for user "
-              + remoteUgi.getUserName();
-    }
-
-    if (!tokenFound) {
-      LOG.warn(message);
-      throw RPCUtil.getRemoteException(message);
-    }
-
-    return appTokenIdentifier;
-  }
-
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
     ApplicationAttemptId applicationAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();
 
@@ -346,7 +288,7 @@ public class ApplicationMasterService extends AbstractService implements
       IOException {
 
     ApplicationAttemptId applicationAttemptId =
-        authorizeRequest().getApplicationAttemptId();
+        YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
     ApplicationId appId = applicationAttemptId.getApplicationId();
 
     RMApp rmApp =
@@ -430,7 +372,8 @@ public class ApplicationMasterService extends AbstractService implements
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
 
     ApplicationAttemptId appAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();