You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/09/08 05:23:47 UTC

[1/2] hadoop git commit: YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil

Repository: hadoop
Updated Branches:
  refs/heads/trunk 9b78e6e33 -> 6f72f1e60


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
new file mode 100644
index 0000000..964379a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the AMRMProxyService test cases. It provides utility
+ * methods that can be used by the concrete test case classes
+ *
+ */
+public abstract class BaseAMRMProxyTest {
+  private static final Log LOG = LogFactory
+      .getLog(BaseAMRMProxyTest.class);
+  /**
+   * The AMRMProxyService instance that will be used by all the test cases
+   */
+  private MockAMRMProxyService amrmProxyService;
+  /**
+   * Thread pool used for asynchronous operations
+   */
+  private static ExecutorService threadpool = Executors
+      .newCachedThreadPool();
+  private Configuration conf;
+  private AsyncDispatcher dispatcher;
+
+  protected MockAMRMProxyService getAMRMProxyService() {
+    Assert.assertNotNull(this.amrmProxyService);
+    return this.amrmProxyService;
+  }
+
+  @Before
+  public void setUp() {
+    this.conf = new YarnConfiguration();
+    this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    String mockPassThroughInterceptorClass =
+        PassThroughRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain will call the mock resource manager. The others in the chain will
+    // simply forward it to the next one in the chain
+    this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + mockPassThroughInterceptorClass + ","
+            + mockPassThroughInterceptorClass + ","
+            + MockRequestInterceptor.class.getName());
+
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(conf);
+    this.dispatcher.start();
+    this.amrmProxyService = createAndStartAMRMProxyService();
+  }
+
+  @After
+  public void tearDown() {
+    amrmProxyService.stop();
+    amrmProxyService = null;
+    this.dispatcher.stop();
+  }
+
+  protected ExecutorService getThreadPool() {
+    return threadpool;
+  }
+
+  protected MockAMRMProxyService createAndStartAMRMProxyService() {
+    MockAMRMProxyService svc =
+        new MockAMRMProxyService(new NullContext(), dispatcher);
+    svc.init(conf);
+    svc.start();
+    return svc;
+  }
+
+  /**
+   * This helper method will invoke the specified function in parallel for each
+   * end point in the specified list using a thread pool and return the
+   * responses received from the function. It implements the logic required for
+   * dispatching requests in parallel and waiting for the responses. If any of
+   * the function call fails or times out, it will ignore and proceed with the
+   * rest. So the responses returned can be less than the number of end points
+   * specified
+   * 
+   * @param testContext
+   * @param func
+   * @return
+   */
+  protected <T, R> List<R> runInParallel(List<T> testContexts,
+      final Function<T, R> func) {
+    ExecutorCompletionService<R> completionService =
+        new ExecutorCompletionService<R>(this.getThreadPool());
+    LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
+        + testContexts.size());
+    for (int index = 0; index < testContexts.size(); index++) {
+      final T testContext = testContexts.get(index);
+
+      LOG.info("Adding request to threadpool for test context: "
+          + testContext.toString());
+
+      completionService.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          LOG.info("Sending request. Test context:"
+              + testContext.toString());
+
+          R response = null;
+          try {
+            response = func.invoke(testContext);
+            LOG.info("Successfully sent request for context: "
+                + testContext.toString());
+          } catch (Throwable ex) {
+            LOG.error("Failed to process request for context: "
+                + testContext);
+            response = null;
+          }
+
+          return response;
+        }
+      });
+    }
+
+    ArrayList<R> responseList = new ArrayList<R>();
+    LOG.info("Waiting for responses from endpoints. Number of contexts="
+        + testContexts.size());
+    for (int i = 0; i < testContexts.size(); ++i) {
+      try {
+        final Future<R> future = completionService.take();
+        final R response = future.get(3000, TimeUnit.MILLISECONDS);
+        responseList.add(response);
+      } catch (Throwable e) {
+        LOG.error("Failed to process request " + e.getMessage());
+      }
+    }
+
+    return responseList;
+  }
+
+  /**
+   * Helper method to register an application master using specified testAppId
+   * as the application identifier and return the response
+   * 
+   * @param testAppId
+   * @return
+   * @throws Exception
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected RegisterApplicationMasterResponse registerApplicationMaster(
+      final int testAppId) throws Exception, YarnException, IOException {
+    final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+    return ugi
+        .getUser()
+        .doAs(
+            new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
+              @Override
+              public RegisterApplicationMasterResponse run()
+                  throws Exception {
+                getAMRMProxyService().initApp(
+                    ugi.getAppAttemptId(),
+                    ugi.getUser().getUserName());
+
+                final RegisterApplicationMasterRequest req =
+                    Records
+                        .newRecord(RegisterApplicationMasterRequest.class);
+                req.setHost(Integer.toString(testAppId));
+                req.setRpcPort(testAppId);
+                req.setTrackingUrl("");
+
+                RegisterApplicationMasterResponse response =
+                    getAMRMProxyService().registerApplicationMaster(req);
+                return response;
+              }
+            });
+  }
+
+  /**
+   * Helper method that can be used to register multiple application masters in
+   * parallel to the specified RM end points
+   * 
+   * @param testContexts - used to identify the requests
+   * @return
+   */
+  protected <T> List<RegisterApplicationMasterResponseInfo<T>> registerApplicationMastersInParallel(
+      final ArrayList<T> testContexts) {
+    List<RegisterApplicationMasterResponseInfo<T>> responses =
+        runInParallel(testContexts,
+            new Function<T, RegisterApplicationMasterResponseInfo<T>>() {
+              @Override
+              public RegisterApplicationMasterResponseInfo<T> invoke(
+                  T testContext) {
+                RegisterApplicationMasterResponseInfo<T> response = null;
+                try {
+                  int index = testContexts.indexOf(testContext);
+                  response =
+                      new RegisterApplicationMasterResponseInfo<T>(
+                          registerApplicationMaster(index), testContext);
+                  Assert.assertNotNull(response.getResponse());
+                  Assert.assertEquals(Integer.toString(index), response
+                      .getResponse().getQueue());
+
+                  LOG.info("Sucessfully registered application master with test context: "
+                      + testContext);
+                } catch (Throwable ex) {
+                  response = null;
+                  LOG.error("Failed to register application master with test context: "
+                      + testContext);
+                }
+
+                return response;
+              }
+            });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        testContexts.size(), responses.size());
+
+    Set<T> contextResponses = new TreeSet<T>();
+    for (RegisterApplicationMasterResponseInfo<T> item : responses) {
+      contextResponses.add(item.getTestContext());
+    }
+
+    for (T ep : testContexts) {
+      Assert.assertTrue(contextResponses.contains(ep));
+    }
+
+    return responses;
+  }
+
+  /**
+   * Unregisters the application master for specified application id
+   * 
+   * @param appId
+   * @param status
+   * @return
+   * @throws Exception
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected FinishApplicationMasterResponse finishApplicationMaster(
+      final int appId, final FinalApplicationStatus status)
+      throws Exception, YarnException, IOException {
+
+    final ApplicationUserInfo ugi = getApplicationUserInfo(appId);
+
+    return ugi.getUser().doAs(
+        new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
+          @Override
+          public FinishApplicationMasterResponse run() throws Exception {
+            final FinishApplicationMasterRequest req =
+                Records.newRecord(FinishApplicationMasterRequest.class);
+            req.setDiagnostics("");
+            req.setTrackingUrl("");
+            req.setFinalApplicationStatus(status);
+
+            FinishApplicationMasterResponse response =
+                getAMRMProxyService().finishApplicationMaster(req);
+
+            getAMRMProxyService().stopApp(
+                ugi.getAppAttemptId().getApplicationId());
+
+            return response;
+          }
+        });
+  }
+
+  protected <T> List<FinishApplicationMasterResponseInfo<T>> finishApplicationMastersInParallel(
+      final ArrayList<T> testContexts) {
+    List<FinishApplicationMasterResponseInfo<T>> responses =
+        runInParallel(testContexts,
+            new Function<T, FinishApplicationMasterResponseInfo<T>>() {
+              @Override
+              public FinishApplicationMasterResponseInfo<T> invoke(
+                  T testContext) {
+                FinishApplicationMasterResponseInfo<T> response = null;
+                try {
+                  response =
+                      new FinishApplicationMasterResponseInfo<T>(
+                          finishApplicationMaster(
+                              testContexts.indexOf(testContext),
+                              FinalApplicationStatus.SUCCEEDED),
+                          testContext);
+                  Assert.assertNotNull(response.getResponse());
+
+                  LOG.info("Sucessfully finished application master with test contexts: "
+                      + testContext);
+                } catch (Throwable ex) {
+                  response = null;
+                  LOG.error("Failed to finish application master with test context: "
+                      + testContext);
+                }
+
+                return response;
+              }
+            });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        testContexts.size(), responses.size());
+
+    Set<T> contextResponses = new TreeSet<T>();
+    for (FinishApplicationMasterResponseInfo<T> item : responses) {
+      Assert.assertNotNull(item);
+      Assert.assertNotNull(item.getResponse());
+      contextResponses.add(item.getTestContext());
+    }
+
+    for (T ep : testContexts) {
+      Assert.assertTrue(contextResponses.contains(ep));
+    }
+
+    return responses;
+  }
+
+  protected AllocateResponse allocate(final int testAppId)
+      throws Exception, YarnException, IOException {
+    final AllocateRequest req = Records.newRecord(AllocateRequest.class);
+    req.setResponseId(testAppId);
+    return allocate(testAppId, req);
+  }
+
+  protected AllocateResponse allocate(final int testAppId,
+      final AllocateRequest request) throws Exception, YarnException,
+      IOException {
+
+    final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+    return ugi.getUser().doAs(
+        new PrivilegedExceptionAction<AllocateResponse>() {
+          @Override
+          public AllocateResponse run() throws Exception {
+            AllocateResponse response =
+                getAMRMProxyService().allocate(request);
+            return response;
+          }
+        });
+  }
+
+  protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
+    final ApplicationAttemptId attemptId =
+        getApplicationAttemptId(testAppId);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(attemptId.toString());
+    AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
+    ugi.addTokenIdentifier(token);
+    return new ApplicationUserInfo(ugi, attemptId);
+  }
+
+  protected List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers)
+      throws Exception {
+    return createResourceRequests(hosts, memory, vCores, priority,
+        containers, null);
+  }
+
+  protected List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression) throws Exception {
+    List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
+    for (String host : hosts) {
+      ResourceRequest hostReq =
+          createResourceRequest(host, memory, vCores, priority,
+              containers, labelExpression);
+      reqs.add(hostReq);
+      ResourceRequest rackReq =
+          createResourceRequest("/default-rack", memory, vCores, priority,
+              containers, labelExpression);
+      reqs.add(rackReq);
+    }
+
+    ResourceRequest offRackReq =
+        createResourceRequest(ResourceRequest.ANY, memory, vCores,
+            priority, containers, labelExpression);
+    reqs.add(offRackReq);
+    return reqs;
+  }
+
+  protected ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers)
+      throws Exception {
+    return createResourceRequest(resource, memory, vCores, priority,
+        containers, null);
+  }
+
+  protected ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression) throws Exception {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setResourceName(resource);
+    req.setNumContainers(containers);
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(priority);
+    req.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(memory);
+    capability.setVirtualCores(vCores);
+    req.setCapability(capability);
+    if (labelExpression != null) {
+      req.setNodeLabelExpression(labelExpression);
+    }
+    return req;
+  }
+
+  /**
+   * Returns an ApplicationId with the specified identifier
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationId getApplicationId(int testAppId) {
+    return ApplicationId.newInstance(123456, testAppId);
+  }
+
+  /**
+   * Return an instance of ApplicationAttemptId using specified identifier. This
+   * identifier will be used for the ApplicationId too.
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
+    return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
+        testAppId);
+  }
+
+  /**
+   * Return an instance of ApplicationAttemptId using specified identifier and
+   * application id
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
+      ApplicationId appId) {
+    return ApplicationAttemptId.newInstance(appId, testAppId);
+  }
+
+  protected static class RegisterApplicationMasterResponseInfo<T> {
+    private RegisterApplicationMasterResponse response;
+    private T testContext;
+
+    RegisterApplicationMasterResponseInfo(
+        RegisterApplicationMasterResponse response, T testContext) {
+      this.response = response;
+      this.testContext = testContext;
+    }
+
+    public RegisterApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public T getTestContext() {
+      return testContext;
+    }
+  }
+
+  protected static class FinishApplicationMasterResponseInfo<T> {
+    private FinishApplicationMasterResponse response;
+    private T testContext;
+
+    FinishApplicationMasterResponseInfo(
+        FinishApplicationMasterResponse response, T testContext) {
+      this.response = response;
+      this.testContext = testContext;
+    }
+
+    public FinishApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public T getTestContext() {
+      return testContext;
+    }
+  }
+
+  protected static class ApplicationUserInfo {
+    private UserGroupInformation user;
+    private ApplicationAttemptId attemptId;
+
+    ApplicationUserInfo(UserGroupInformation user,
+        ApplicationAttemptId attemptId) {
+      this.user = user;
+      this.attemptId = attemptId;
+    }
+
+    public UserGroupInformation getUser() {
+      return this.user;
+    }
+
+    public ApplicationAttemptId getAppAttemptId() {
+      return this.attemptId;
+    }
+  }
+
+  protected static class MockAMRMProxyService extends AMRMProxyService {
+    public MockAMRMProxyService(Context nmContext,
+        AsyncDispatcher dispatcher) {
+      super(nmContext, dispatcher);
+    }
+
+    /**
+     * This method is used by the test code to initialize the pipeline. In the
+     * actual service, the initialization is called by the
+     * ContainerManagerImpl::StartContainers method
+     * 
+     * @param applicationId
+     * @param user
+     */
+    public void initApp(ApplicationAttemptId applicationId, String user) {
+      super.initializePipeline(applicationId, user, null, null);
+    }
+
+    public void stopApp(ApplicationId applicationId) {
+      super.stopApplication(applicationId);
+    }
+  }
+
+  /**
+   * The Function interface is used for passing method pointers that can be
+   * invoked asynchronously at a later point.
+   */
+  protected interface Function<T, R> {
+    public R invoke(T input);
+  }
+
+  protected class NullContext implements Context {
+
+    @Override
+    public NodeId getNodeId() {
+      return null;
+    }
+
+    @Override
+    public int getHttpPort() {
+      return 0;
+    }
+
+    @Override
+    public ConcurrentMap<ApplicationId, Application> getApplications() {
+      return null;
+    }
+
+    @Override
+    public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
+      return null;
+    }
+
+    @Override
+    public ConcurrentMap<ContainerId, Container> getContainers() {
+      return null;
+    }
+
+    @Override
+    public NMContainerTokenSecretManager getContainerTokenSecretManager() {
+      return null;
+    }
+
+    @Override
+    public NMTokenSecretManagerInNM getNMTokenSecretManager() {
+      return null;
+    }
+
+    @Override
+    public NodeHealthStatus getNodeHealthStatus() {
+      return null;
+    }
+
+    @Override
+    public ContainerManagementProtocol getContainerManager() {
+      return null;
+    }
+
+    @Override
+    public LocalDirsHandlerService getLocalDirsHandler() {
+      return null;
+    }
+
+    @Override
+    public ApplicationACLsManager getApplicationACLsManager() {
+      return null;
+    }
+
+    @Override
+    public NMStateStoreService getNMStateStore() {
+      return null;
+    }
+
+    @Override
+    public boolean getDecommissioned() {
+      return false;
+    }
+
+    @Override
+    public void setDecommissioned(boolean isDecommissioned) {
+    }
+
+    @Override
+    public ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps() {
+      return null;
+    }
+
+    @Override
+    public NodeResourceMonitor getNodeResourceMonitor() {
+      return null;
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
new file mode 100644
index 0000000..c962f97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class MockRequestInterceptor extends AbstractRequestInterceptor {
+
+  private MockResourceManagerFacade mockRM;
+
+  public MockRequestInterceptor() {
+  }
+
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    mockRM =
+        new MockResourceManagerFacade(new YarnConfiguration(
+            super.getConf()), 0);
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return mockRM.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return mockRM.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    return mockRM.allocate(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
new file mode 100644
index 0000000..7573a7a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.directory.api.util.exception.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.mortbay.log.Log;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the unit test cases. So please change the
+ * implementation with care.
+ */
+public class MockResourceManagerFacade implements
+    ApplicationMasterProtocol, ApplicationClientProtocol {
+
+  private HashMap<String, List<ContainerId>> applicationContainerIdMap =
+      new HashMap<String, List<ContainerId>>();
+  private HashMap<ContainerId, Container> allocatedContainerMap =
+      new HashMap<ContainerId, Container>();
+  private AtomicInteger containerIndex = new AtomicInteger(0);
+  private Configuration conf;
+
+  public MockResourceManagerFacade(Configuration conf,
+      int startContainerIndex) {
+    this.conf = conf;
+    this.containerIndex.set(startContainerIndex);
+  }
+
+  private static String getAppIdentifier() throws IOException {
+    AMRMTokenIdentifier result = null;
+    UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
+    }
+    return result != null ? result.getApplicationAttemptId().toString()
+        : "";
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    String amrmToken = getAppIdentifier();
+    Log.info("Registering application attempt: " + amrmToken);
+
+    synchronized (applicationContainerIdMap) {
+      Assert.assertFalse("The application id is already registered: "
+          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      // Keep track of the containers that are returned to this application
+      applicationContainerIdMap.put(amrmToken,
+          new ArrayList<ContainerId>());
+    }
+
+    return RegisterApplicationMasterResponse.newInstance(null, null, null,
+        null, null, request.getHost(), null);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    String amrmToken = getAppIdentifier();
+    Log.info("Finishing application attempt: " + amrmToken);
+
+    synchronized (applicationContainerIdMap) {
+      // Remove the containers that were being tracked for this application
+      Assert.assertTrue("The application id is NOT registered: "
+          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
+      for (ContainerId c : ids) {
+        allocatedContainerMap.remove(c);
+      }
+    }
+
+    return FinishApplicationMasterResponse
+        .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
+            : false);
+  }
+
+  protected ApplicationId getApplicationId(int id) {
+    return ApplicationId.newInstance(12345, id);
+  }
+
+  protected ApplicationAttemptId getApplicationAttemptId(int id) {
+    return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    if (request.getAskList() != null && request.getAskList().size() > 0
+        && request.getReleaseList() != null
+        && request.getReleaseList().size() > 0) {
+      Assert.fail("The mock RM implementation does not support receiving "
+          + "askList and releaseList in the same heartbeat");
+    }
+
+    String amrmToken = getAppIdentifier();
+
+    ArrayList<Container> containerList = new ArrayList<Container>();
+    if (request.getAskList() != null) {
+      for (ResourceRequest rr : request.getAskList()) {
+        for (int i = 0; i < rr.getNumContainers(); i++) {
+          ContainerId containerId =
+              ContainerId.newInstance(getApplicationAttemptId(1),
+                  containerIndex.incrementAndGet());
+          Container container = Records.newRecord(Container.class);
+          container.setId(containerId);
+          container.setPriority(rr.getPriority());
+
+          // We don't use the node for running containers in the test cases. So
+          // it is OK to hard code it to some dummy value
+          NodeId nodeId =
+              NodeId.newInstance(
+                  !Strings.isEmpty(rr.getResourceName()) ? rr
+                      .getResourceName() : "dummy", 1000);
+          container.setNodeId(nodeId);
+          container.setResource(rr.getCapability());
+          containerList.add(container);
+
+          synchronized (applicationContainerIdMap) {
+            // Keep track of the containers returned to this application. We
+            // will need it in future
+            Assert.assertTrue(
+                "The application id is Not registered before allocate(): "
+                    + amrmToken,
+                applicationContainerIdMap.containsKey(amrmToken));
+            List<ContainerId> ids =
+                applicationContainerIdMap.get(amrmToken);
+            ids.add(containerId);
+            this.allocatedContainerMap.put(containerId, container);
+          }
+        }
+      }
+    }
+
+    if (request.getReleaseList() != null
+        && request.getReleaseList().size() > 0) {
+      Log.info("Releasing containers: " + request.getReleaseList().size());
+      synchronized (applicationContainerIdMap) {
+        Assert.assertTrue(
+            "The application id is not registered before allocate(): "
+                + amrmToken,
+            applicationContainerIdMap.containsKey(amrmToken));
+        List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+
+        for (ContainerId id : request.getReleaseList()) {
+          boolean found = false;
+          for (ContainerId c : ids) {
+            if (c.equals(id)) {
+              found = true;
+              break;
+            }
+          }
+
+          Assert.assertTrue(
+              "ContainerId " + id
+                  + " being released is not valid for application: "
+                  + conf.get("AMRMTOKEN"), found);
+
+          ids.remove(id);
+
+          // Return the released container back to the AM with new fake Ids. The
+          // test case does not care about the IDs. The IDs are faked because
+          // otherwise the LRM will throw duplication identifier exception. This
+          // returning of fake containers is ONLY done for testing purpose - for
+          // the test code to get confirmation that the sub-cluster resource
+          // managers received the release request
+          ContainerId fakeContainerId =
+              ContainerId.newInstance(getApplicationAttemptId(1),
+                  containerIndex.incrementAndGet());
+          Container fakeContainer = allocatedContainerMap.get(id);
+          fakeContainer.setId(fakeContainerId);
+          containerList.add(fakeContainer);
+        }
+      }
+    }
+
+    Log.info("Allocating containers: " + containerList.size()
+        + " for application attempt: " + conf.get("AMRMTOKEN"));
+    return AllocateResponse.newInstance(0,
+        new ArrayList<ContainerStatus>(), containerList,
+        new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
+        new ArrayList<NMToken>(),
+        new ArrayList<ContainerResourceIncrease>(),
+        new ArrayList<ContainerResourceDecrease>());
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException,
+      IOException {
+
+    GetApplicationReportResponse response =
+        Records.newRecord(GetApplicationReportResponse.class);
+    ApplicationReport report = Records.newRecord(ApplicationReport.class);
+    report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+    report.setApplicationId(request.getApplicationId());
+    report.setCurrentApplicationAttemptId(ApplicationAttemptId
+        .newInstance(request.getApplicationId(), 1));
+    response.setApplicationReport(report);
+    return response;
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request) throws YarnException,
+      IOException {
+    GetApplicationAttemptReportResponse response =
+        Records.newRecord(GetApplicationAttemptReportResponse.class);
+    ApplicationAttemptReport report =
+        Records.newRecord(ApplicationAttemptReport.class);
+    report.setApplicationAttemptId(request.getApplicationAttemptId());
+    report
+        .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+    response.setApplicationAttemptReport(report);
+    return response;
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(
+      GetApplicationsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(
+      GetClusterNodesRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request) throws YarnException,
+      IOException {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
new file mode 100644
index 0000000..97a844e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain
+ *
+ */
+public class PassThroughRequestInterceptor extends
+    AbstractRequestInterceptor {
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().allocate(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
new file mode 100644
index 0000000..69b913a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAMRMProxyService extends BaseAMRMProxyTest {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestAMRMProxyService.class);
+
+  /**
+   * Test if the pipeline is created properly.
+   */
+  @Test
+  public void testRequestInterceptorChainCreation() throws Exception {
+    RequestInterceptor root =
+        super.getAMRMProxyService().createRequestInterceptorChain();
+    int index = 0;
+    while (root != null) {
+      switch (index) {
+      case 0:
+      case 1:
+      case 2:
+        Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+            root.getClass().getName());
+        break;
+      case 3:
+        Assert.assertEquals(MockRequestInterceptor.class.getName(), root
+            .getClass().getName());
+        break;
+      }
+
+      root = root.getNextInterceptor();
+      index++;
+    }
+
+    Assert.assertEquals(
+        "The number of interceptors in chain does not match",
+        Integer.toString(4), Integer.toString(index));
+
+  }
+
+  /**
+   * Tests registration of a single application master.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterOneApplicationMaster() throws Exception {
+    // The testAppId identifier is used as host name and the mock resource
+    // manager return it as the queue name. Assert that we received the queue
+    // name
+    int testAppId = 1;
+    RegisterApplicationMasterResponse response1 =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(response1);
+    Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
+  }
+
+  /**
+   * Tests the registration of multiple application master serially one at a
+   * time.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterMulitpleApplicationMasters() throws Exception {
+    for (int testAppId = 0; testAppId < 3; testAppId++) {
+      RegisterApplicationMasterResponse response =
+          registerApplicationMaster(testAppId);
+      Assert.assertNotNull(response);
+      Assert
+          .assertEquals(Integer.toString(testAppId), response.getQueue());
+    }
+  }
+
+  /**
+   * Tests the registration of multiple application masters using multiple
+   * threads in parallel.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterMulitpleApplicationMastersInParallel()
+      throws Exception {
+    int numberOfRequests = 5;
+    ArrayList<String> testContexts =
+        CreateTestRequestIdentifiers(numberOfRequests);
+    super.registerApplicationMastersInParallel(testContexts);
+  }
+
+  private ArrayList<String> CreateTestRequestIdentifiers(
+      int numberOfRequests) {
+    ArrayList<String> testContexts = new ArrayList<String>();
+    LOG.info("Creating " + numberOfRequests + " contexts for testing");
+    for (int ep = 0; ep < numberOfRequests; ep++) {
+      testContexts.add("test-endpoint-" + Integer.toString(ep));
+      LOG.info("Created test context: " + testContexts.get(ep));
+    }
+    return testContexts;
+  }
+
+  @Test
+  public void testFinishOneApplicationMasterWithSuccess() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId,
+            FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  @Test
+  public void testFinishOneApplicationMasterWithFailure() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(false, finshResponse.getIsUnregistered());
+
+    try {
+      // Try to finish an application master that is already finished.
+      finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishInvalidApplicationMaster() throws Exception {
+    try {
+      // Try to finish an application master that was not registered.
+      finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishMulitpleApplicationMasters() throws Exception {
+    int numberOfRequests = 3;
+    for (int index = 0; index < numberOfRequests; index++) {
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(index);
+      Assert.assertNotNull(registerResponse);
+      Assert.assertEquals(Integer.toString(index),
+          registerResponse.getQueue());
+    }
+
+    // Finish in reverse sequence
+    for (int index = numberOfRequests - 1; index >= 0; index--) {
+      FinishApplicationMasterResponse finshResponse =
+          finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
+
+      Assert.assertNotNull(finshResponse);
+      Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+      // Assert that the application has been removed from the collection
+      Assert.assertTrue(this.getAMRMProxyService()
+          .getPipelines().size() == index);
+    }
+
+    try {
+      // Try to finish an application master that is already finished.
+      finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+
+    try {
+      // Try to finish an application master that was not registered.
+      finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishMulitpleApplicationMastersInParallel()
+      throws Exception {
+    int numberOfRequests = 5;
+    ArrayList<String> testContexts = new ArrayList<String>();
+    LOG.info("Creating " + numberOfRequests + " contexts for testing");
+    for (int i = 0; i < numberOfRequests; i++) {
+      testContexts.add("test-endpoint-" + Integer.toString(i));
+      LOG.info("Created test context: " + testContexts.get(i));
+
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(i);
+      Assert.assertNotNull(registerResponse);
+      Assert
+          .assertEquals(Integer.toString(i), registerResponse.getQueue());
+    }
+
+    finishApplicationMastersInParallel(testContexts);
+  }
+
+  @Test
+  public void testAllocateRequestWithNullValues() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    AllocateResponse allocateResponse = allocate(testAppId);
+    Assert.assertNotNull(allocateResponse);
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId,
+            FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  @Test
+  public void testAllocateRequestWithoutRegistering() throws Exception {
+
+    try {
+      // Try to allocate an application master without registering.
+      allocate(1);
+      Assert
+          .fail("The request to allocate application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("AllocateRequest failed as expected because AM was not registered");
+    }
+  }
+
+  @Test
+  public void testAllocateWithOneResourceRequest() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    getContainersAndAssert(testAppId, 1);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateWithMultipleResourceRequest() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    getContainersAndAssert(testAppId, 10);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainers() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    List<Container> containers = getContainersAndAssert(testAppId, 10);
+    releaseContainersAndAssert(testAppId, containers);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainersForMultipleAM()
+      throws Exception {
+    int numberOfApps = 5;
+    for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(testAppId);
+      Assert.assertNotNull(registerResponse);
+      List<Container> containers = getContainersAndAssert(testAppId, 10);
+      releaseContainersAndAssert(testAppId, containers);
+    }
+    for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+      finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+    }
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainersForMultipleAMInParallel()
+      throws Exception {
+    int numberOfApps = 6;
+    ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
+    for (int i = 0; i < numberOfApps; i++) {
+      tempAppIds.add(new Integer(i));
+    }
+
+    final ArrayList<Integer> appIds = tempAppIds;
+    List<Integer> responses =
+        runInParallel(appIds, new Function<Integer, Integer>() {
+          @Override
+          public Integer invoke(Integer testAppId) {
+            try {
+              RegisterApplicationMasterResponse registerResponse =
+                  registerApplicationMaster(testAppId);
+              Assert.assertNotNull("response is null", registerResponse);
+              List<Container> containers =
+                  getContainersAndAssert(testAppId, 10);
+              releaseContainersAndAssert(testAppId, containers);
+
+              LOG.info("Sucessfully registered application master with appId: "
+                  + testAppId);
+            } catch (Throwable ex) {
+              LOG.error(
+                  "Failed to register application master with appId: "
+                      + testAppId, ex);
+              testAppId = null;
+            }
+
+            return testAppId;
+          }
+        });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        appIds.size(), responses.size());
+
+    for (Integer testAppId : responses) {
+      Assert.assertNotNull(testAppId);
+      finishApplicationMaster(testAppId.intValue(),
+          FinalApplicationStatus.SUCCEEDED);
+    }
+  }
+
+  private List<Container> getContainersAndAssert(int appId,
+      int numberOfResourceRequests) throws Exception {
+    AllocateRequest allocateRequest =
+        Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<Container> containers =
+        new ArrayList<Container>(numberOfResourceRequests);
+    List<ResourceRequest> askList =
+        new ArrayList<ResourceRequest>(numberOfResourceRequests);
+    for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
+      askList.add(createResourceRequest(
+          "test-node-" + Integer.toString(testAppId), 6000, 2,
+          testAppId % 5, 1));
+    }
+
+    allocateRequest.setAskList(askList);
+
+    AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+    Assert.assertNotNull("allocate() returned null response",
+        allocateResponse);
+
+    containers.addAll(allocateResponse.getAllocatedContainers());
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containers.size() < askList.size() && numHeartbeat++ < 10) {
+      allocateResponse =
+          allocate(appId, Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull("allocate() returned null response",
+          allocateResponse);
+
+      containers.addAll(allocateResponse.getAllocatedContainers());
+
+      LOG.info("Number of allocated containers in this request: "
+          + Integer.toString(allocateResponse.getAllocatedContainers()
+              .size()));
+      LOG.info("Total number of allocated containers: "
+          + Integer.toString(containers.size()));
+      Thread.sleep(10);
+    }
+
+    // We broadcast the request, the number of containers we received will be
+    // higher than we ask
+    Assert.assertTrue("The asklist count is not same as response",
+        askList.size() <= containers.size());
+    return containers;
+  }
+
+  private void releaseContainersAndAssert(int appId,
+      List<Container> containers) throws Exception {
+    Assert.assertTrue(containers.size() > 0);
+    AllocateRequest allocateRequest =
+        Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<ContainerId> relList =
+        new ArrayList<ContainerId>(containers.size());
+    for (Container container : containers) {
+      relList.add(container.getId());
+    }
+
+    allocateRequest.setReleaseList(relList);
+
+    AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+    Assert.assertNotNull(allocateResponse);
+
+    // The way the mock resource manager is setup, it will return the containers
+    // that were released in the response. This is done because the UAMs run
+    // asynchronously and we need to if all the resource managers received the
+    // release it. The containers sent by the mock resource managers will be
+    // aggregated and returned back to us and we can assert if all the release
+    // lists reached the sub-clusters
+    List<Container> containersForReleasedContainerIds =
+        new ArrayList<Container>();
+    containersForReleasedContainerIds.addAll(allocateResponse
+        .getAllocatedContainers());
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containersForReleasedContainerIds.size() < relList.size()
+        && numHeartbeat++ < 10) {
+      allocateResponse =
+          allocate(appId, Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull(allocateResponse);
+      containersForReleasedContainerIds.addAll(allocateResponse
+          .getAllocatedContainers());
+
+      LOG.info("Number of containers received in this request: "
+          + Integer.toString(allocateResponse.getAllocatedContainers()
+              .size()));
+      LOG.info("Total number of containers received: "
+          + Integer.toString(containersForReleasedContainerIds.size()));
+      Thread.sleep(10);
+    }
+
+    Assert.assertEquals(relList.size(),
+        containersForReleasedContainerIds.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index c8b985d..14142de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -40,9 +40,7 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -175,69 +173,13 @@ public class ApplicationMasterService extends AbstractService implements
     return this.masterServiceAddress;
   }
 
-  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
-  // currently sets only the required id, but iterate through anyways just to be
-  // sure.
-  private AMRMTokenIdentifier selectAMRMTokenIdentifier(
-      UserGroupInformation remoteUgi) throws IOException {
-    AMRMTokenIdentifier result = null;
-    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
-    for (TokenIdentifier tokenId : tokenIds) {
-      if (tokenId instanceof AMRMTokenIdentifier) {
-        result = (AMRMTokenIdentifier) tokenId;
-        break;
-      }
-    }
-
-    return result;
-  }
-
-  private AMRMTokenIdentifier authorizeRequest()
-      throws YarnException {
-
-    UserGroupInformation remoteUgi;
-    try {
-      remoteUgi = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      String msg =
-          "Cannot obtain the user-name for authorizing ApplicationMaster. "
-              + "Got exception: " + StringUtils.stringifyException(e);
-      LOG.warn(msg);
-      throw RPCUtil.getRemoteException(msg);
-    }
-
-    boolean tokenFound = false;
-    String message = "";
-    AMRMTokenIdentifier appTokenIdentifier = null;
-    try {
-      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
-      if (appTokenIdentifier == null) {
-        tokenFound = false;
-        message = "No AMRMToken found for user " + remoteUgi.getUserName();
-      } else {
-        tokenFound = true;
-      }
-    } catch (IOException e) {
-      tokenFound = false;
-      message =
-          "Got exception while looking for AMRMToken for user "
-              + remoteUgi.getUserName();
-    }
-
-    if (!tokenFound) {
-      LOG.warn(message);
-      throw RPCUtil.getRemoteException(message);
-    }
-
-    return appTokenIdentifier;
-  }
-
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
     ApplicationAttemptId applicationAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();
 
@@ -346,7 +288,7 @@ public class ApplicationMasterService extends AbstractService implements
       IOException {
 
     ApplicationAttemptId applicationAttemptId =
-        authorizeRequest().getApplicationAttemptId();
+        YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
     ApplicationId appId = applicationAttemptId.getApplicationId();
 
     RMApp rmApp =
@@ -430,7 +372,8 @@ public class ApplicationMasterService extends AbstractService implements
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
 
     ApplicationAttemptId appAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();


[2/2] hadoop git commit: YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil

Posted by ji...@apache.org.
YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f72f1e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f72f1e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f72f1e6

Branch: refs/heads/trunk
Commit: 6f72f1e6003ab11679bebeb96f27f1f62b3b3e02
Parents: 9b78e6e
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 8 09:35:46 2015 +0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Sep 8 09:35:46 2015 +0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  17 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../src/main/resources/yarn-default.xml         |  34 +
 .../server/utils/YarnServerSecurityUtils.java   | 142 ++++
 .../amrmproxy/AMRMProxyApplicationContext.java  |  70 ++
 .../AMRMProxyApplicationContextImpl.java        | 132 ++++
 .../nodemanager/amrmproxy/AMRMProxyService.java | 592 ++++++++++++++++
 .../amrmproxy/AMRMProxyTokenSecretManager.java  | 265 ++++++++
 .../amrmproxy/AbstractRequestInterceptor.java   | 102 +++
 .../amrmproxy/DefaultRequestInterceptor.java    | 138 ++++
 .../amrmproxy/RequestInterceptor.java           |  71 ++
 .../containermanager/ContainerManagerImpl.java  |  67 +-
 .../amrmproxy/BaseAMRMProxyTest.java            | 677 +++++++++++++++++++
 .../amrmproxy/MockRequestInterceptor.java       |  65 ++
 .../amrmproxy/MockResourceManagerFacade.java    | 469 +++++++++++++
 .../PassThroughRequestInterceptor.java          |  58 ++
 .../amrmproxy/TestAMRMProxyService.java         | 484 +++++++++++++
 .../ApplicationMasterService.java               |  69 +-
 19 files changed, 3366 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a162070..be1feb4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -187,6 +187,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3970. Add REST api support for Application Priority.
     (Naganarasimha G R via vvasudev)
 
+    YARN-2884. Added a proxy service in NM to proxy the the communication
+    between AM and RM. (Kishore Chaliparambil via jianhe) 
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4879ca1..9ec25ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1332,6 +1332,23 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
       + "application.classpath";
 
+  public static final String AMRM_PROXY_ENABLED = NM_PREFIX
+      + "amrmproxy.enable";
+  public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+  public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+      + "amrmproxy.address";
+  public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
+  public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+      + DEFAULT_AMRM_PROXY_PORT;
+  public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+      + "amrmproxy.client.thread-count";
+  public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+  public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+      NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
+  public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+      "org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+          + "DefaultRequestInterceptor";
+
   /**
    * Default platform-agnostic CLASSPATH for YARN applications. A
    * comma-separated list of CLASSPATH entries. The parameter expansion marker

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index e89a90d..97fcfa1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -86,6 +86,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
 
     // Ignore all YARN Application Timeline Service (version 1) properties
     configurationPrefixToSkipCompare.add("yarn.timeline-service.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 59bfb56..b76defb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2259,4 +2259,38 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+    Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
+    calls from the application masters to the resource manager.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+    The address of the AMRMProxyService listener.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.address</name>
+    <value>0.0.0.0:8048</value>
+  </property>
+
+  <property>
+    <description>
+    The number of threads used to handle requests by the AMRMProxyService.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.client.thread-count</name>
+    <value>25</value>
+  </property>
+
+  <property>
+    <description>
+    The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
+    AMRMProxyService to create the request processing pipeline for applications.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
new file mode 100644
index 0000000..9af556e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that contains commonly used server methods.
+ *
+ */
+@Private
+public final class YarnServerSecurityUtils {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(YarnServerSecurityUtils.class);
+
+  private YarnServerSecurityUtils() {
+  }
+
+  /**
+   * Authorizes the current request and returns the AMRMTokenIdentifier for the
+   * current application.
+   *
+   * @return the AMRMTokenIdentifier instance for the current user
+   * @throws YarnException
+   */
+  public static AMRMTokenIdentifier authorizeRequest()
+      throws YarnException {
+
+    UserGroupInformation remoteUgi;
+    try {
+      remoteUgi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      String msg =
+          "Cannot obtain the user-name for authorizing ApplicationMaster. "
+              + "Got exception: " + StringUtils.stringifyException(e);
+      LOG.warn(msg);
+      throw RPCUtil.getRemoteException(msg);
+    }
+
+    boolean tokenFound = false;
+    String message = "";
+    AMRMTokenIdentifier appTokenIdentifier = null;
+    try {
+      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+      if (appTokenIdentifier == null) {
+        tokenFound = false;
+        message = "No AMRMToken found for user " + remoteUgi.getUserName();
+      } else {
+        tokenFound = true;
+      }
+    } catch (IOException e) {
+      tokenFound = false;
+      message =
+          "Got exception while looking for AMRMToken for user "
+              + remoteUgi.getUserName();
+    }
+
+    if (!tokenFound) {
+      LOG.warn(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    return appTokenIdentifier;
+  }
+
+  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+  // currently sets only the required id, but iterate through anyways just to be
+  // sure.
+  private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
+      UserGroupInformation remoteUgi) throws IOException {
+    AMRMTokenIdentifier result = null;
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Parses the container launch context and returns a Credential instance that
+   * contains all the tokens from the launch context. 
+   * @param launchContext
+   * @return the credential instance
+   * @throws IOException
+   */
+  public static Credentials parseCredentials(
+      ContainerLaunchContext launchContext) throws IOException {
+    Credentials credentials = new Credentials();
+    ByteBuffer tokens = launchContext.getTokens();
+
+    if (tokens != null) {
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      tokens.rewind();
+      buf.reset(tokens);
+      credentials.readTokenStorageStream(buf);
+      if (LOG.isDebugEnabled()) {
+        for (Token<? extends TokenIdentifier> tk : credentials
+            .getAllTokens()) {
+          LOG.debug(tk.getService() + " = " + tk.toString());
+        }
+      }
+    }
+
+    return credentials;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
new file mode 100644
index 0000000..c355a8b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Interface that can be used by the intercepter plugins to get the information
+ * about one application.
+ *
+ */
+public interface AMRMProxyApplicationContext {
+
+  /**
+   * Gets the configuration object instance.
+   * @return the configuration object.
+   */
+  Configuration getConf();
+
+  /**
+   * Gets the application attempt identifier.
+   * @return the application attempt identifier.
+   */
+  ApplicationAttemptId getApplicationAttemptId();
+
+  /**
+   * Gets the application submitter.
+   * @return the application submitter
+   */
+  String getUser();
+
+  /**
+   * Gets the application's AMRMToken that is issued by the RM.
+   * @return the application's AMRMToken that is issued by the RM.
+   */
+  Token<AMRMTokenIdentifier> getAMRMToken();
+
+  /**
+   * Gets the application's local AMRMToken issued by the proxy service.
+   * @return the application's local AMRMToken issued by the proxy service.
+   */
+  Token<AMRMTokenIdentifier> getLocalAMRMToken();
+
+  /**
+   * Gets the NMContext object.
+   * @return the NMContext.
+   */
+  Context getNMCotext();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
new file mode 100644
index 0000000..2e5aa94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Encapsulates the information about one application that is needed by the
+ * request intercepters.
+ *
+ */
+public class AMRMProxyApplicationContextImpl implements
+    AMRMProxyApplicationContext {
+  private final Configuration conf;
+  private final Context nmContext;
+  private final ApplicationAttemptId applicationAttemptId;
+  private final String user;
+  private Integer localTokenKeyId;
+  private Token<AMRMTokenIdentifier> amrmToken;
+  private Token<AMRMTokenIdentifier> localToken;
+
+  /**
+   * Create an instance of the AMRMProxyApplicationContext.
+   * 
+   * @param nmContext
+   * @param conf
+   * @param applicationAttemptId
+   * @param user
+   * @param amrmToken
+   */
+  public AMRMProxyApplicationContextImpl(Context nmContext,
+      Configuration conf, ApplicationAttemptId applicationAttemptId,
+      String user, Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    this.nmContext = nmContext;
+    this.conf = conf;
+    this.applicationAttemptId = applicationAttemptId;
+    this.user = user;
+    this.amrmToken = amrmToken;
+    this.localToken = localToken;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public synchronized Token<AMRMTokenIdentifier> getAMRMToken() {
+    return amrmToken;
+  }
+
+  /**
+   * Sets the application's AMRMToken.
+   */
+  public synchronized void setAMRMToken(
+      Token<AMRMTokenIdentifier> amrmToken) {
+    this.amrmToken = amrmToken;
+  }
+
+  @Override
+  public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() {
+    return this.localToken;
+  }
+
+  /**
+   * Sets the application's AMRMToken.
+   */
+  public synchronized void setLocalAMRMToken(
+      Token<AMRMTokenIdentifier> localToken) {
+    this.localToken = localToken;
+    this.localTokenKeyId = null;
+  }
+
+  @Private
+  public synchronized int getLocalAMRMTokenKeyId() {
+    Integer keyId = this.localTokenKeyId;
+    if (keyId == null) {
+      try {
+        if (this.localToken == null) {
+          throw new YarnRuntimeException("Missing AMRM token for "
+              + this.applicationAttemptId);
+        }
+        keyId = this.amrmToken.decodeIdentifier().getKeyId();
+        this.localTokenKeyId = keyId;
+      } catch (IOException e) {
+        throw new YarnRuntimeException("AMRM token decode error for "
+            + this.applicationAttemptId, e);
+      }
+    }
+    return keyId;
+  }
+
+  @Override
+  public Context getNMCotext() {
+    return nmContext;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
new file mode 100644
index 0000000..bd6538c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -0,0 +1,592 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AMRMProxyService is a service that runs on each node manager that can be used
+ * to intercept and inspect messages from application master to the cluster
+ * resource manager. It listens to messages from the application master and
+ * creates a request intercepting pipeline instance for each application. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed.
+ */
+public class AMRMProxyService extends AbstractService implements
+    ApplicationMasterProtocol {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AMRMProxyService.class);
+  private Server server;
+  private final Context nmContext;
+  private final AsyncDispatcher dispatcher;
+  private InetSocketAddress listenerEndpoint;
+  private AMRMProxyTokenSecretManager secretManager;
+  private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
+
+  /**
+   * Creates an instance of the service.
+   * 
+   * @param nmContext
+   * @param dispatcher
+   */
+  public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
+    super(AMRMProxyService.class.getName());
+    Preconditions.checkArgument(nmContext != null, "nmContext is null");
+    Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
+    this.nmContext = nmContext;
+    this.dispatcher = dispatcher;
+    this.applPipelineMap =
+        new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
+
+    this.dispatcher.register(ApplicationEventType.class,
+        new ApplicationEventHandler());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting AMRMProxyService");
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation.setConfiguration(conf);
+
+    this.listenerEndpoint =
+        conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+    serverConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
+
+    int numWorkerThreads =
+        serverConf.getInt(
+            YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
+
+    this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
+    this.secretManager.start();
+
+    this.server =
+        rpc.getServer(ApplicationMasterProtocol.class, this,
+            listenerEndpoint, serverConf, this.secretManager,
+            numWorkerThreads);
+
+    this.server.start();
+    LOG.info("AMRMProxyService listening on address: "
+        + this.server.getListenerAddress());
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping AMRMProxyService");
+    if (this.server != null) {
+      this.server.stop();
+    }
+
+    this.secretManager.stop();
+
+    super.serviceStop();
+  }
+
+  /**
+   * This is called by the AMs started on this node to register with the RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific intercepter chain.
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Registering application master." + " Host:"
+        + request.getHost() + " Port:" + request.getRpcPort()
+        + " Tracking Url:" + request.getTrackingUrl());
+    RequestInterceptorChainWrapper pipeline =
+        authorizeAndGetInterceptorChain();
+    return pipeline.getRootInterceptor()
+        .registerApplicationMaster(request);
+  }
+
+  /**
+   * This is called by the AMs started on this node to unregister from the RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific intercepter chain.
+   */
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Finishing application master. Tracking Url:"
+        + request.getTrackingUrl());
+    RequestInterceptorChainWrapper pipeline =
+        authorizeAndGetInterceptorChain();
+    return pipeline.getRootInterceptor().finishApplicationMaster(request);
+  }
+
+  /**
+   * This is called by the AMs started on this node to send heart beat to RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific pipeline, which is a chain of request
+   * intercepter objects. One application request processing pipeline is created
+   * per AM instance.
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    RequestInterceptorChainWrapper pipeline =
+        getInterceptorChain(amrmTokenIdentifier);
+    AllocateResponse allocateResponse =
+        pipeline.getRootInterceptor().allocate(request);
+
+    updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
+
+    return allocateResponse;
+  }
+
+  /**
+   * Callback from the ContainerManager implementation for initializing the
+   * application request processing pipeline.
+   *
+   * @param request - encapsulates information for starting an AM
+   * @throws IOException
+   * @throws YarnException
+   */
+  public void processApplicationStartRequest(StartContainerRequest request)
+      throws IOException, YarnException {
+    LOG.info("Callback received for initializing request "
+        + "processing pipeline for an AM");
+    ContainerTokenIdentifier containerTokenIdentifierForKey =
+        BuilderUtils.newContainerTokenIdentifier(request
+            .getContainerToken());
+    ApplicationAttemptId appAttemptId =
+        containerTokenIdentifierForKey.getContainerID()
+            .getApplicationAttemptId();
+    Credentials credentials =
+        YarnServerSecurityUtils.parseCredentials(request
+            .getContainerLaunchContext());
+
+    Token<AMRMTokenIdentifier> amrmToken =
+        getFirstAMRMToken(credentials.getAllTokens());
+    if (amrmToken == null) {
+      throw new YarnRuntimeException(
+          "AMRMToken not found in the start container request for application:"
+              + appAttemptId.toString());
+    }
+
+    // Substitute the existing AMRM Token with a local one. Keep the rest of the
+    // tokens in the credentials intact.
+    Token<AMRMTokenIdentifier> localToken =
+        this.secretManager.createAndGetAMRMToken(appAttemptId);
+    credentials.addToken(localToken.getService(), localToken);
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    request.getContainerLaunchContext().setTokens(
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+    initializePipeline(containerTokenIdentifierForKey.getContainerID()
+        .getApplicationAttemptId(),
+        containerTokenIdentifierForKey.getApplicationSubmitter(),
+        amrmToken, localToken);
+  }
+
+  /**
+   * Initializes the request intercepter pipeline for the specified application.
+   * 
+   * @param applicationAttemptId
+   * @param user
+   * @param amrmToken
+   */
+  protected void initializePipeline(
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    RequestInterceptorChainWrapper chainWrapper = null;
+    synchronized (applPipelineMap) {
+      if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
+        LOG.warn("Request to start an already existing appId was received. "
+            + " This can happen if an application failed and a new attempt "
+            + "was created on this machine.  ApplicationId: "
+            + applicationAttemptId.toString());
+        return;
+      }
+
+      chainWrapper = new RequestInterceptorChainWrapper();
+      this.applPipelineMap.put(applicationAttemptId.getApplicationId(),
+          chainWrapper);
+    }
+
+    // We register the pipeline instance in the map first and then initialize it
+    // later because chain initialization can be expensive and we would like to
+    // release the lock as soon as possible to prevent other applications from
+    // blocking when one application's chain is initializing
+    LOG.info("Initializing request processing pipeline for application. "
+        + " ApplicationId:" + applicationAttemptId + " for the user: "
+        + user);
+
+    RequestInterceptor interceptorChain =
+        this.createRequestInterceptorChain();
+    interceptorChain.init(createApplicationMasterContext(
+        applicationAttemptId, user, amrmToken, localToken));
+    chainWrapper.init(interceptorChain, applicationAttemptId);
+  }
+
+  /**
+   * Shuts down the request processing pipeline for the specified application
+   * attempt id.
+   *
+   * @param applicationId
+   */
+  protected void stopApplication(ApplicationId applicationId) {
+    Preconditions.checkArgument(applicationId != null,
+        "applicationId is null");
+    RequestInterceptorChainWrapper pipeline =
+        this.applPipelineMap.remove(applicationId);
+
+    if (pipeline == null) {
+      LOG.info("Request to stop an application that does not exist. Id:"
+          + applicationId);
+    } else {
+      LOG.info("Stopping the request processing pipeline for application: "
+          + applicationId);
+      try {
+        pipeline.getRootInterceptor().shutdown();
+      } catch (Throwable ex) {
+        LOG.warn(
+            "Failed to shutdown the request processing pipeline for app:"
+                + applicationId, ex);
+      }
+    }
+  }
+
+  private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
+      RequestInterceptorChainWrapper pipeline,
+      AllocateResponse allocateResponse) {
+    AMRMProxyApplicationContextImpl context =
+        (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
+            .getApplicationContext();
+
+    // check to see if the RM has issued a new AMRMToken & accordingly update
+    // the real ARMRMToken in the current context
+    if (allocateResponse.getAMRMToken() != null) {
+      org.apache.hadoop.yarn.api.records.Token token =
+          allocateResponse.getAMRMToken();
+
+      org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
+          new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+              token.getIdentifier().array(), token.getPassword().array(),
+              new Text(token.getKind()), new Text(token.getService()));
+
+      context.setAMRMToken(newTokenId);
+    }
+
+    // Check if the local AMRMToken is rolled up and update the context and
+    // response accordingly
+    MasterKeyData nextMasterKey =
+        this.secretManager.getNextMasterKeyData();
+
+    if (nextMasterKey != null
+        && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+            .getKeyId()) {
+      Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
+      if (nextMasterKey.getMasterKey().getKeyId() != context
+          .getLocalAMRMTokenKeyId()) {
+        LOG.info("The local AMRMToken has been rolled-over."
+            + " Send new local AMRMToken back to application: "
+            + pipeline.getApplicationId());
+        localToken =
+            this.secretManager.createAndGetAMRMToken(pipeline
+                .getApplicationAttemptId());
+        context.setLocalAMRMToken(localToken);
+      }
+
+      allocateResponse
+          .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+              .newInstance(localToken.getIdentifier(), localToken
+                  .getKind().toString(), localToken.getPassword(),
+                  localToken.getService().toString()));
+    }
+  }
+
+  private AMRMProxyApplicationContext createApplicationMasterContext(
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    AMRMProxyApplicationContextImpl appContext =
+        new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
+            applicationAttemptId, user, amrmToken, localToken);
+    return appContext;
+  }
+
+  /**
+   * Gets the Request intercepter chains for all the applications.
+   * 
+   * @return the request intercepter chains.
+   */
+  protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
+    return this.applPipelineMap;
+  }
+
+  /**
+   * This method creates and returns reference of the first intercepter in the
+   * chain of request intercepter instances.
+   *
+   * @return the reference of the first intercepter in the chain
+   */
+  protected RequestInterceptor createRequestInterceptorChain() {
+    Configuration conf = getConfig();
+
+    List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+    RequestInterceptor pipeline = null;
+    RequestInterceptor current = null;
+    for (String interceptorClassName : interceptorClassNames) {
+      try {
+        Class<?> interceptorClass =
+            conf.getClassByName(interceptorClassName);
+        if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+          RequestInterceptor interceptorInstance =
+              (RequestInterceptor) ReflectionUtils.newInstance(
+                  interceptorClass, conf);
+          if (pipeline == null) {
+            pipeline = interceptorInstance;
+            current = interceptorInstance;
+            continue;
+          } else {
+            current.setNextInterceptor(interceptorInstance);
+            current = interceptorInstance;
+          }
+        } else {
+          throw new YarnRuntimeException("Class: " + interceptorClassName
+              + " not instance of "
+              + RequestInterceptor.class.getCanonicalName());
+        }
+      } catch (ClassNotFoundException e) {
+        throw new YarnRuntimeException(
+            "Could not instantiate ApplicationMasterRequestInterceptor: "
+                + interceptorClassName, e);
+      }
+    }
+
+    if (pipeline == null) {
+      throw new YarnRuntimeException(
+          "RequestInterceptor pipeline is not configured in the system");
+    }
+    return pipeline;
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param conf
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration conf) {
+    String configuredInterceptorClassNames =
+        conf.get(
+            YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());
+    }
+
+    return interceptorClassNames;
+  }
+
+  /**
+   * Authorizes the request and returns the application specific request
+   * processing pipeline.
+   *
+   * @return the the intercepter wrapper instance
+   * @throws YarnException
+   */
+  private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
+      throws YarnException {
+    AMRMTokenIdentifier tokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    return getInterceptorChain(tokenIdentifier);
+  }
+
+  private RequestInterceptorChainWrapper getInterceptorChain(
+      AMRMTokenIdentifier tokenIdentifier) throws YarnException {
+    ApplicationAttemptId appAttemptId =
+        tokenIdentifier.getApplicationAttemptId();
+
+    synchronized (this.applPipelineMap) {
+      if (!this.applPipelineMap.containsKey(appAttemptId
+          .getApplicationId())) {
+        throw new YarnException(
+            "The AM request processing pipeline is not initialized for app: "
+                + appAttemptId.getApplicationId().toString());
+      }
+
+      return this.applPipelineMap.get(appAttemptId.getApplicationId());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Token<AMRMTokenIdentifier> getFirstAMRMToken(
+      Collection<Token<? extends TokenIdentifier>> allTokens) {
+    Iterator<Token<? extends TokenIdentifier>> iter = allTokens.iterator();
+    while (iter.hasNext()) {
+      Token<? extends TokenIdentifier> token = iter.next();
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        return (Token<AMRMTokenIdentifier>) token;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Private class for handling application stop events.
+   *
+   */
+  class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
+
+    @Override
+    public void handle(ApplicationEvent event) {
+      Application app =
+          AMRMProxyService.this.nmContext.getApplications().get(
+              event.getApplicationID());
+      if (app != null) {
+        switch (event.getType()) {
+        case FINISH_APPLICATION:
+          LOG.info("Application stop event received for stopping AppId:"
+              + event.getApplicationID().toString());
+          AMRMProxyService.this.stopApplication(event.getApplicationID());
+          break;
+        default:
+          LOG.debug("AMRMProxy is ignoring event: " + event.getType());
+          break;
+        }
+      } else {
+        LOG.warn("Event " + event + " sent to absent application "
+            + event.getApplicationID());
+      }
+    }
+  }
+
+  /**
+   * Private structure for encapsulating RequestInterceptor and
+   * ApplicationAttemptId instances.
+   *
+   */
+  private static class RequestInterceptorChainWrapper {
+    private RequestInterceptor rootInterceptor;
+    private ApplicationAttemptId applicationAttemptId;
+
+    /**
+     * Initializes the wrapper with the specified parameters.
+     * 
+     * @param rootInterceptor
+     * @param applicationAttemptId
+     */
+    public synchronized void init(RequestInterceptor rootInterceptor,
+        ApplicationAttemptId applicationAttemptId) {
+      this.rootInterceptor = rootInterceptor;
+      this.applicationAttemptId = applicationAttemptId;
+    }
+
+    /**
+     * Gets the root request intercepter.
+     * 
+     * @return the root request intercepter
+     */
+    public synchronized RequestInterceptor getRootInterceptor() {
+      return rootInterceptor;
+    }
+
+    /**
+     * Gets the application attempt identifier.
+     * 
+     * @return the application attempt identifier
+     */
+    public synchronized ApplicationAttemptId getApplicationAttemptId() {
+      return applicationAttemptId;
+    }
+
+    /**
+     * Gets the application identifier.
+     * 
+     * @return the application identifier
+     */
+    public synchronized ApplicationId getApplicationId() {
+      return applicationAttemptId.getApplicationId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
new file mode 100644
index 0000000..d09ce41
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This secret manager instance is used by the AMRMProxyService to generate and
+ * manage tokens.
+ */
+public class AMRMProxyTokenSecretManager extends
+    SecretManager<AMRMTokenIdentifier> {
+
+  private static final Log LOG = LogFactory
+      .getLog(AMRMProxyTokenSecretManager.class);
+
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private final Timer timer;
+  private final long rollingInterval;
+  private final long activationDelay;
+
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
+
+  /**
+   * Create an {@link AMRMProxyTokenSecretManager}.
+   */
+  public AMRMProxyTokenSecretManager(Configuration conf) {
+    this.timer = new Timer();
+    this.rollingInterval =
+        conf.getLong(
+            YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+            YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
+        + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 3 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
+  }
+
+  public void start() {
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+        rollingInterval);
+  }
+
+  public void stop() {
+    this.timer.cancel();
+  }
+
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for "
+          + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private class MasterKeyRoller extends TimerTask {
+    @Override
+    public void run() {
+      rollMasterKey();
+    }
+  }
+
+  @Private
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey()
+              .getMasterKey().getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
+          password, identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey.
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by
+   * RPC layer to validate a remote {@link AMRMTokenIdentifier}.
+   */
+  @Override
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for "
+            + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken(applicationAttemptId
+            + " not found in AMRMProxyTokenSecretManager.");
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+          .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+            this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+              .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+            this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Invalid AMRMToken from "
+          + applicationAttemptId);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   * Creates an empty TokenId to be used for de-serializing an
+   * {@link AMRMTokenIdentifier} by the RPC layer.
+   */
+  @Override
+  public AMRMTokenIdentifier createIdentifier() {
+    return new AMRMTokenIdentifier();
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+          .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
new file mode 100644
index 0000000..810dfa8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractRequestInterceptor implements
+    RequestInterceptor {
+  private Configuration conf;
+  private AMRMProxyApplicationContext appContext;
+  private RequestInterceptor nextInterceptor;
+
+  /**
+   * Sets the {@link RequestInterceptor} in the chain.
+   */
+  @Override
+  public void setNextInterceptor(RequestInterceptor nextInterceptor) {
+    this.nextInterceptor = nextInterceptor;
+  }
+
+  /**
+   * Sets the {@link Configuration}.
+   */
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.setConf(conf);
+    }
+  }
+
+  /**
+   * Gets the {@link Configuration}.
+   */
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Initializes the {@link RequestInterceptor}.
+   */
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    Preconditions.checkState(this.appContext == null,
+        "init is called multiple times on this interceptor: "
+            + this.getClass().getName());
+    this.appContext = appContext;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.init(appContext);
+    }
+  }
+
+  /**
+   * Disposes the {@link RequestInterceptor}.
+   */
+  @Override
+  public void shutdown() {
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.shutdown();
+    }
+  }
+
+  /**
+   * Gets the next {@link RequestInterceptor} in the chain.
+   */
+  @Override
+  public RequestInterceptor getNextInterceptor() {
+    return this.nextInterceptor;
+  }
+
+  /**
+   * Gets the {@link AMRMProxyApplicationContext}.
+   */
+  public AMRMProxyApplicationContext getApplicationContext() {
+    return this.appContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
new file mode 100644
index 0000000..2c7939b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor class and provides an implementation
+ * that simply forwards the AM requests to the cluster resource manager.
+ *
+ */
+public final class DefaultRequestInterceptor extends
+    AbstractRequestInterceptor {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DefaultRequestInterceptor.class);
+  private ApplicationMasterProtocol rmClient;
+  private UserGroupInformation user = null;
+
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    try {
+      user =
+          UserGroupInformation.createProxyUser(appContext
+              .getApplicationAttemptId().toString(), UserGroupInformation
+              .getCurrentUser());
+      user.addToken(appContext.getAMRMToken());
+      final Configuration conf = this.getConf();
+
+      rmClient =
+          user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+            @Override
+            public ApplicationMasterProtocol run() throws Exception {
+              return ClientRMProxy.createRMProxy(conf,
+                  ApplicationMasterProtocol.class);
+            }
+          });
+    } catch (IOException e) {
+      String message =
+          "Error while creating of RM app master service proxy for attemptId:"
+              + appContext.getApplicationAttemptId().toString();
+      if (user != null) {
+        message += ", user: " + user;
+      }
+
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      final RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    LOG.info("Forwarding registration request to the real YARN RM");
+    return rmClient.registerApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(final AllocateRequest request)
+      throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocate request to the real YARN RM");
+    }
+    AllocateResponse allocateResponse = rmClient.allocate(request);
+    if (allocateResponse.getAMRMToken() != null) {
+      updateAMRMToken(allocateResponse.getAMRMToken());
+    }
+
+    return allocateResponse;
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      final FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding finish application request to "
+        + "the real YARN Resource Manager");
+    return rmClient.finishApplicationMaster(request);
+  }
+
+  @Override
+  public void setNextInterceptor(RequestInterceptor next) {
+    throw new YarnRuntimeException(
+        "setNextInterceptor is being called on DefaultRequestInterceptor,"
+            + "which should be the last one in the chain "
+            + "Check if the interceptor pipeline configuration is correct");
+  }
+
+  private void updateAMRMToken(Token token) throws IOException {
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+            token.getIdentifier().array(), token.getPassword().array(),
+            new Text(token.getKind()), new Text(token.getService()));
+    // Preserve the token service sent by the RM when adding the token
+    // to ensure we replace the previous token setup by the RM.
+    // Afterwards we can update the service address for the RPC layer.
+    user.addToken(amrmToken);
+    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
new file mode 100644
index 0000000..c74c88f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the application
+ * master to the resource manager.
+ */
+public interface RequestInterceptor extends ApplicationMasterProtocol,
+    Configurable {
+  /**
+   * This method is called for initializing the intercepter. This is guaranteed
+   * to be called only once in the lifetime of this instance.
+   *
+   * @param ctx
+   */
+  void init(AMRMProxyApplicationContext ctx);
+
+  /**
+   * This method is called to release the resources held by the intercepter.
+   * This will be called when the application pipeline is being destroyed. The
+   * concrete implementations should dispose the resources and forward the
+   * request to the next intercepter, if any.
+   */
+  void shutdown();
+
+  /**
+   * Sets the next intercepter in the pipeline. The concrete implementation of
+   * this interface should always pass the request to the nextInterceptor after
+   * inspecting the message. The last intercepter in the chain is responsible to
+   * send the messages to the resource manager service and so the last
+   * intercepter will not receive this method call.
+   *
+   * @param nextInterceptor
+   */
+  void setNextInterceptor(RequestInterceptor nextInterceptor);
+
+  /**
+   * Returns the next intercepter in the chain.
+   * 
+   * @return the next intercepter in the chain
+   */
+  RequestInterceptor getNextInterceptor();
+
+  /**
+   * Returns the context.
+   * 
+   * @return the context
+   */
+  AMRMProxyApplicationContext getApplicationContext();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 68c7f2c..a658e53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
@@ -51,7 +50,6 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
@@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements
   private boolean serviceStopped = false;
   private final ReadLock readLock;
   private final WriteLock writeLock;
+  private AMRMProxyService amrmProxyService;
+  private boolean amrmProxyEnabled = false;
 
   private long waitForContainersOnShutdownMillis;
 
@@ -235,6 +238,20 @@ public class ContainerManagerImpl extends CompositeService implements
     addService(sharedCacheUploader);
     dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
 
+    amrmProxyEnabled =
+        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+    if (amrmProxyEnabled) {
+      LOG.info("AMRMProxyService is enabled. "
+          + "All the AM->RM requests will be intercepted by the proxy");
+      this.amrmProxyService =
+          new AMRMProxyService(this.context, this.dispatcher);
+      addService(this.amrmProxyService);
+    } else {
+      LOG.info("AMRMProxyService is disabled");
+    }
+
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
             YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -246,6 +263,10 @@ public class ContainerManagerImpl extends CompositeService implements
     recover();
   }
 
+  public boolean isARMRMProxyEnabled() {
+    return amrmProxyEnabled;
+  }
+
   @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
@@ -314,7 +335,8 @@ public class ContainerManagerImpl extends CompositeService implements
         + " with exit code " + rcs.getExitCode());
 
     if (context.getApplications().containsKey(appId)) {
-      Credentials credentials = parseCredentials(launchContext);
+      Credentials credentials =
+          YarnServerSecurityUtils.parseCredentials(launchContext);
       Container container = new ContainerImpl(getConfig(), dispatcher,
           context.getNMStateStore(), req.getContainerLaunchContext(),
           credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
@@ -737,8 +759,17 @@ public class ContainerManagerImpl extends CompositeService implements
         verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
           containerTokenIdentifier);
         containerId = containerTokenIdentifier.getContainerID();
-        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
-          request);
+
+        // Initialize the AMRMProxy service instance only if the container is of
+        // type AM and if the AMRMProxy service is enabled
+        if (isARMRMProxyEnabled()
+            && containerTokenIdentifier.getContainerType().equals(
+                ContainerType.APPLICATION_MASTER)) {
+          this.amrmProxyService.processApplicationStartRequest(request);
+        }
+
+        startContainerInternal(nmTokenIdentifier,
+            containerTokenIdentifier, request);
         succeededContainers.add(containerId);
       } catch (YarnException e) {
         failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -751,7 +782,7 @@ public class ContainerManagerImpl extends CompositeService implements
     }
 
     return StartContainersResponse.newInstance(getAuxServiceMetaData(),
-      succeededContainers, failedContainers);
+        succeededContainers, failedContainers);
   }
 
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@@ -844,7 +875,8 @@ public class ContainerManagerImpl extends CompositeService implements
       }
     }
 
-    Credentials credentials = parseCredentials(launchContext);
+    Credentials credentials =
+        YarnServerSecurityUtils.parseCredentials(launchContext);
 
     Container container =
         new ContainerImpl(getConfig(), this.dispatcher,
@@ -928,27 +960,6 @@ public class ContainerManagerImpl extends CompositeService implements
       nmTokenIdentifier);
   }
 
-  private Credentials parseCredentials(ContainerLaunchContext launchContext)
-      throws IOException {
-    Credentials credentials = new Credentials();
-    // //////////// Parse credentials
-    ByteBuffer tokens = launchContext.getTokens();
-
-    if (tokens != null) {
-      DataInputByteBuffer buf = new DataInputByteBuffer();
-      tokens.rewind();
-      buf.reset(tokens);
-      credentials.readTokenStorageStream(buf);
-      if (LOG.isDebugEnabled()) {
-        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
-          LOG.debug(tk.getService() + " = " + tk.toString());
-        }
-      }
-    }
-    // //////////// End of parsing credentials
-    return credentials;
-  }
-
   /**
    * Stop a list of containers running on this NodeManager.
    */