You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/11 19:20:32 UTC

[11/42] hadoop git commit: YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil

YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f72f1e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f72f1e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f72f1e6

Branch: refs/heads/YARN-1197
Commit: 6f72f1e6003ab11679bebeb96f27f1f62b3b3e02
Parents: 9b78e6e
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 8 09:35:46 2015 +0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Sep 8 09:35:46 2015 +0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  17 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../src/main/resources/yarn-default.xml         |  34 +
 .../server/utils/YarnServerSecurityUtils.java   | 142 ++++
 .../amrmproxy/AMRMProxyApplicationContext.java  |  70 ++
 .../AMRMProxyApplicationContextImpl.java        | 132 ++++
 .../nodemanager/amrmproxy/AMRMProxyService.java | 592 ++++++++++++++++
 .../amrmproxy/AMRMProxyTokenSecretManager.java  | 265 ++++++++
 .../amrmproxy/AbstractRequestInterceptor.java   | 102 +++
 .../amrmproxy/DefaultRequestInterceptor.java    | 138 ++++
 .../amrmproxy/RequestInterceptor.java           |  71 ++
 .../containermanager/ContainerManagerImpl.java  |  67 +-
 .../amrmproxy/BaseAMRMProxyTest.java            | 677 +++++++++++++++++++
 .../amrmproxy/MockRequestInterceptor.java       |  65 ++
 .../amrmproxy/MockResourceManagerFacade.java    | 469 +++++++++++++
 .../PassThroughRequestInterceptor.java          |  58 ++
 .../amrmproxy/TestAMRMProxyService.java         | 484 +++++++++++++
 .../ApplicationMasterService.java               |  69 +-
 19 files changed, 3366 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a162070..be1feb4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -187,6 +187,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3970. Add REST api support for Application Priority.
     (Naganarasimha G R via vvasudev)
 
+    YARN-2884. Added a proxy service in NM to proxy the the communication
+    between AM and RM. (Kishore Chaliparambil via jianhe) 
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4879ca1..9ec25ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1332,6 +1332,23 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
       + "application.classpath";
 
+  public static final String AMRM_PROXY_ENABLED = NM_PREFIX
+      + "amrmproxy.enable";
+  public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+  public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+      + "amrmproxy.address";
+  public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
+  public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+      + DEFAULT_AMRM_PROXY_PORT;
+  public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+      + "amrmproxy.client.thread-count";
+  public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+  public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+      NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
+  public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+      "org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+          + "DefaultRequestInterceptor";
+
   /**
    * Default platform-agnostic CLASSPATH for YARN applications. A
    * comma-separated list of CLASSPATH entries. The parameter expansion marker

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index e89a90d..97fcfa1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -86,6 +86,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
 
     // Ignore all YARN Application Timeline Service (version 1) properties
     configurationPrefixToSkipCompare.add("yarn.timeline-service.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 59bfb56..b76defb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2259,4 +2259,38 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+    Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
+    calls from the application masters to the resource manager.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+    The address of the AMRMProxyService listener.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.address</name>
+    <value>0.0.0.0:8048</value>
+  </property>
+
+  <property>
+    <description>
+    The number of threads used to handle requests by the AMRMProxyService.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.client.thread-count</name>
+    <value>25</value>
+  </property>
+
+  <property>
+    <description>
+    The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
+    AMRMProxyService to create the request processing pipeline for applications.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
new file mode 100644
index 0000000..9af556e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that contains commonly used server methods.
+ *
+ */
+@Private
+public final class YarnServerSecurityUtils {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(YarnServerSecurityUtils.class);
+
+  private YarnServerSecurityUtils() {
+  }
+
+  /**
+   * Authorizes the current request and returns the AMRMTokenIdentifier for the
+   * current application.
+   *
+   * @return the AMRMTokenIdentifier instance for the current user
+   * @throws YarnException
+   */
+  public static AMRMTokenIdentifier authorizeRequest()
+      throws YarnException {
+
+    UserGroupInformation remoteUgi;
+    try {
+      remoteUgi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      String msg =
+          "Cannot obtain the user-name for authorizing ApplicationMaster. "
+              + "Got exception: " + StringUtils.stringifyException(e);
+      LOG.warn(msg);
+      throw RPCUtil.getRemoteException(msg);
+    }
+
+    boolean tokenFound = false;
+    String message = "";
+    AMRMTokenIdentifier appTokenIdentifier = null;
+    try {
+      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+      if (appTokenIdentifier == null) {
+        tokenFound = false;
+        message = "No AMRMToken found for user " + remoteUgi.getUserName();
+      } else {
+        tokenFound = true;
+      }
+    } catch (IOException e) {
+      tokenFound = false;
+      message =
+          "Got exception while looking for AMRMToken for user "
+              + remoteUgi.getUserName();
+    }
+
+    if (!tokenFound) {
+      LOG.warn(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    return appTokenIdentifier;
+  }
+
+  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+  // currently sets only the required id, but iterate through anyways just to be
+  // sure.
+  private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
+      UserGroupInformation remoteUgi) throws IOException {
+    AMRMTokenIdentifier result = null;
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Parses the container launch context and returns a Credential instance that
+   * contains all the tokens from the launch context. 
+   * @param launchContext
+   * @return the credential instance
+   * @throws IOException
+   */
+  public static Credentials parseCredentials(
+      ContainerLaunchContext launchContext) throws IOException {
+    Credentials credentials = new Credentials();
+    ByteBuffer tokens = launchContext.getTokens();
+
+    if (tokens != null) {
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      tokens.rewind();
+      buf.reset(tokens);
+      credentials.readTokenStorageStream(buf);
+      if (LOG.isDebugEnabled()) {
+        for (Token<? extends TokenIdentifier> tk : credentials
+            .getAllTokens()) {
+          LOG.debug(tk.getService() + " = " + tk.toString());
+        }
+      }
+    }
+
+    return credentials;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
new file mode 100644
index 0000000..c355a8b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Interface that can be used by the intercepter plugins to get the information
+ * about one application.
+ *
+ */
+public interface AMRMProxyApplicationContext {
+
+  /**
+   * Gets the configuration object instance.
+   * @return the configuration object.
+   */
+  Configuration getConf();
+
+  /**
+   * Gets the application attempt identifier.
+   * @return the application attempt identifier.
+   */
+  ApplicationAttemptId getApplicationAttemptId();
+
+  /**
+   * Gets the application submitter.
+   * @return the application submitter
+   */
+  String getUser();
+
+  /**
+   * Gets the application's AMRMToken that is issued by the RM.
+   * @return the application's AMRMToken that is issued by the RM.
+   */
+  Token<AMRMTokenIdentifier> getAMRMToken();
+
+  /**
+   * Gets the application's local AMRMToken issued by the proxy service.
+   * @return the application's local AMRMToken issued by the proxy service.
+   */
+  Token<AMRMTokenIdentifier> getLocalAMRMToken();
+
+  /**
+   * Gets the NMContext object.
+   * @return the NMContext.
+   */
+  Context getNMCotext();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
new file mode 100644
index 0000000..2e5aa94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Encapsulates the information about one application that is needed by the
+ * request intercepters.
+ *
+ */
+public class AMRMProxyApplicationContextImpl implements
+    AMRMProxyApplicationContext {
+  private final Configuration conf;
+  private final Context nmContext;
+  private final ApplicationAttemptId applicationAttemptId;
+  private final String user;
+  private Integer localTokenKeyId;
+  private Token<AMRMTokenIdentifier> amrmToken;
+  private Token<AMRMTokenIdentifier> localToken;
+
+  /**
+   * Create an instance of the AMRMProxyApplicationContext.
+   * 
+   * @param nmContext
+   * @param conf
+   * @param applicationAttemptId
+   * @param user
+   * @param amrmToken
+   */
+  public AMRMProxyApplicationContextImpl(Context nmContext,
+      Configuration conf, ApplicationAttemptId applicationAttemptId,
+      String user, Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    this.nmContext = nmContext;
+    this.conf = conf;
+    this.applicationAttemptId = applicationAttemptId;
+    this.user = user;
+    this.amrmToken = amrmToken;
+    this.localToken = localToken;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public synchronized Token<AMRMTokenIdentifier> getAMRMToken() {
+    return amrmToken;
+  }
+
+  /**
+   * Sets the application's AMRMToken.
+   */
+  public synchronized void setAMRMToken(
+      Token<AMRMTokenIdentifier> amrmToken) {
+    this.amrmToken = amrmToken;
+  }
+
+  @Override
+  public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() {
+    return this.localToken;
+  }
+
+  /**
+   * Sets the application's AMRMToken.
+   */
+  public synchronized void setLocalAMRMToken(
+      Token<AMRMTokenIdentifier> localToken) {
+    this.localToken = localToken;
+    this.localTokenKeyId = null;
+  }
+
+  @Private
+  public synchronized int getLocalAMRMTokenKeyId() {
+    Integer keyId = this.localTokenKeyId;
+    if (keyId == null) {
+      try {
+        if (this.localToken == null) {
+          throw new YarnRuntimeException("Missing AMRM token for "
+              + this.applicationAttemptId);
+        }
+        keyId = this.amrmToken.decodeIdentifier().getKeyId();
+        this.localTokenKeyId = keyId;
+      } catch (IOException e) {
+        throw new YarnRuntimeException("AMRM token decode error for "
+            + this.applicationAttemptId, e);
+      }
+    }
+    return keyId;
+  }
+
+  @Override
+  public Context getNMCotext() {
+    return nmContext;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
new file mode 100644
index 0000000..bd6538c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -0,0 +1,592 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AMRMProxyService is a service that runs on each node manager that can be used
+ * to intercept and inspect messages from application master to the cluster
+ * resource manager. It listens to messages from the application master and
+ * creates a request intercepting pipeline instance for each application. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed.
+ */
+public class AMRMProxyService extends AbstractService implements
+    ApplicationMasterProtocol {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AMRMProxyService.class);
+  private Server server;
+  private final Context nmContext;
+  private final AsyncDispatcher dispatcher;
+  private InetSocketAddress listenerEndpoint;
+  private AMRMProxyTokenSecretManager secretManager;
+  private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
+
+  /**
+   * Creates an instance of the service.
+   * 
+   * @param nmContext
+   * @param dispatcher
+   */
+  public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
+    super(AMRMProxyService.class.getName());
+    Preconditions.checkArgument(nmContext != null, "nmContext is null");
+    Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
+    this.nmContext = nmContext;
+    this.dispatcher = dispatcher;
+    this.applPipelineMap =
+        new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
+
+    this.dispatcher.register(ApplicationEventType.class,
+        new ApplicationEventHandler());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting AMRMProxyService");
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation.setConfiguration(conf);
+
+    this.listenerEndpoint =
+        conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+    serverConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
+
+    int numWorkerThreads =
+        serverConf.getInt(
+            YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
+
+    this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
+    this.secretManager.start();
+
+    this.server =
+        rpc.getServer(ApplicationMasterProtocol.class, this,
+            listenerEndpoint, serverConf, this.secretManager,
+            numWorkerThreads);
+
+    this.server.start();
+    LOG.info("AMRMProxyService listening on address: "
+        + this.server.getListenerAddress());
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping AMRMProxyService");
+    if (this.server != null) {
+      this.server.stop();
+    }
+
+    this.secretManager.stop();
+
+    super.serviceStop();
+  }
+
+  /**
+   * This is called by the AMs started on this node to register with the RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific intercepter chain.
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Registering application master." + " Host:"
+        + request.getHost() + " Port:" + request.getRpcPort()
+        + " Tracking Url:" + request.getTrackingUrl());
+    RequestInterceptorChainWrapper pipeline =
+        authorizeAndGetInterceptorChain();
+    return pipeline.getRootInterceptor()
+        .registerApplicationMaster(request);
+  }
+
+  /**
+   * This is called by the AMs started on this node to unregister from the RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific intercepter chain.
+   */
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Finishing application master. Tracking Url:"
+        + request.getTrackingUrl());
+    RequestInterceptorChainWrapper pipeline =
+        authorizeAndGetInterceptorChain();
+    return pipeline.getRootInterceptor().finishApplicationMaster(request);
+  }
+
+  /**
+   * This is called by the AMs started on this node to send heart beat to RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific pipeline, which is a chain of request
+   * intercepter objects. One application request processing pipeline is created
+   * per AM instance.
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    RequestInterceptorChainWrapper pipeline =
+        getInterceptorChain(amrmTokenIdentifier);
+    AllocateResponse allocateResponse =
+        pipeline.getRootInterceptor().allocate(request);
+
+    updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
+
+    return allocateResponse;
+  }
+
+  /**
+   * Callback from the ContainerManager implementation for initializing the
+   * application request processing pipeline.
+   *
+   * @param request - encapsulates information for starting an AM
+   * @throws IOException
+   * @throws YarnException
+   */
+  public void processApplicationStartRequest(StartContainerRequest request)
+      throws IOException, YarnException {
+    LOG.info("Callback received for initializing request "
+        + "processing pipeline for an AM");
+    ContainerTokenIdentifier containerTokenIdentifierForKey =
+        BuilderUtils.newContainerTokenIdentifier(request
+            .getContainerToken());
+    ApplicationAttemptId appAttemptId =
+        containerTokenIdentifierForKey.getContainerID()
+            .getApplicationAttemptId();
+    Credentials credentials =
+        YarnServerSecurityUtils.parseCredentials(request
+            .getContainerLaunchContext());
+
+    Token<AMRMTokenIdentifier> amrmToken =
+        getFirstAMRMToken(credentials.getAllTokens());
+    if (amrmToken == null) {
+      throw new YarnRuntimeException(
+          "AMRMToken not found in the start container request for application:"
+              + appAttemptId.toString());
+    }
+
+    // Substitute the existing AMRM Token with a local one. Keep the rest of the
+    // tokens in the credentials intact.
+    Token<AMRMTokenIdentifier> localToken =
+        this.secretManager.createAndGetAMRMToken(appAttemptId);
+    credentials.addToken(localToken.getService(), localToken);
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    request.getContainerLaunchContext().setTokens(
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+    initializePipeline(containerTokenIdentifierForKey.getContainerID()
+        .getApplicationAttemptId(),
+        containerTokenIdentifierForKey.getApplicationSubmitter(),
+        amrmToken, localToken);
+  }
+
+  /**
+   * Initializes the request intercepter pipeline for the specified application.
+   * 
+   * @param applicationAttemptId
+   * @param user
+   * @param amrmToken
+   */
+  protected void initializePipeline(
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    RequestInterceptorChainWrapper chainWrapper = null;
+    synchronized (applPipelineMap) {
+      if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
+        LOG.warn("Request to start an already existing appId was received. "
+            + " This can happen if an application failed and a new attempt "
+            + "was created on this machine.  ApplicationId: "
+            + applicationAttemptId.toString());
+        return;
+      }
+
+      chainWrapper = new RequestInterceptorChainWrapper();
+      this.applPipelineMap.put(applicationAttemptId.getApplicationId(),
+          chainWrapper);
+    }
+
+    // We register the pipeline instance in the map first and then initialize it
+    // later because chain initialization can be expensive and we would like to
+    // release the lock as soon as possible to prevent other applications from
+    // blocking when one application's chain is initializing
+    LOG.info("Initializing request processing pipeline for application. "
+        + " ApplicationId:" + applicationAttemptId + " for the user: "
+        + user);
+
+    RequestInterceptor interceptorChain =
+        this.createRequestInterceptorChain();
+    interceptorChain.init(createApplicationMasterContext(
+        applicationAttemptId, user, amrmToken, localToken));
+    chainWrapper.init(interceptorChain, applicationAttemptId);
+  }
+
+  /**
+   * Shuts down the request processing pipeline for the specified application
+   * attempt id.
+   *
+   * @param applicationId
+   */
+  protected void stopApplication(ApplicationId applicationId) {
+    Preconditions.checkArgument(applicationId != null,
+        "applicationId is null");
+    RequestInterceptorChainWrapper pipeline =
+        this.applPipelineMap.remove(applicationId);
+
+    if (pipeline == null) {
+      LOG.info("Request to stop an application that does not exist. Id:"
+          + applicationId);
+    } else {
+      LOG.info("Stopping the request processing pipeline for application: "
+          + applicationId);
+      try {
+        pipeline.getRootInterceptor().shutdown();
+      } catch (Throwable ex) {
+        LOG.warn(
+            "Failed to shutdown the request processing pipeline for app:"
+                + applicationId, ex);
+      }
+    }
+  }
+
+  private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
+      RequestInterceptorChainWrapper pipeline,
+      AllocateResponse allocateResponse) {
+    AMRMProxyApplicationContextImpl context =
+        (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
+            .getApplicationContext();
+
+    // check to see if the RM has issued a new AMRMToken & accordingly update
+    // the real ARMRMToken in the current context
+    if (allocateResponse.getAMRMToken() != null) {
+      org.apache.hadoop.yarn.api.records.Token token =
+          allocateResponse.getAMRMToken();
+
+      org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
+          new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+              token.getIdentifier().array(), token.getPassword().array(),
+              new Text(token.getKind()), new Text(token.getService()));
+
+      context.setAMRMToken(newTokenId);
+    }
+
+    // Check if the local AMRMToken is rolled up and update the context and
+    // response accordingly
+    MasterKeyData nextMasterKey =
+        this.secretManager.getNextMasterKeyData();
+
+    if (nextMasterKey != null
+        && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+            .getKeyId()) {
+      Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
+      if (nextMasterKey.getMasterKey().getKeyId() != context
+          .getLocalAMRMTokenKeyId()) {
+        LOG.info("The local AMRMToken has been rolled-over."
+            + " Send new local AMRMToken back to application: "
+            + pipeline.getApplicationId());
+        localToken =
+            this.secretManager.createAndGetAMRMToken(pipeline
+                .getApplicationAttemptId());
+        context.setLocalAMRMToken(localToken);
+      }
+
+      allocateResponse
+          .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+              .newInstance(localToken.getIdentifier(), localToken
+                  .getKind().toString(), localToken.getPassword(),
+                  localToken.getService().toString()));
+    }
+  }
+
+  private AMRMProxyApplicationContext createApplicationMasterContext(
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    AMRMProxyApplicationContextImpl appContext =
+        new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
+            applicationAttemptId, user, amrmToken, localToken);
+    return appContext;
+  }
+
+  /**
+   * Gets the Request intercepter chains for all the applications.
+   * 
+   * @return the request intercepter chains.
+   */
+  protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
+    return this.applPipelineMap;
+  }
+
+  /**
+   * This method creates and returns reference of the first intercepter in the
+   * chain of request intercepter instances.
+   *
+   * @return the reference of the first intercepter in the chain
+   */
+  protected RequestInterceptor createRequestInterceptorChain() {
+    Configuration conf = getConfig();
+
+    List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+    RequestInterceptor pipeline = null;
+    RequestInterceptor current = null;
+    for (String interceptorClassName : interceptorClassNames) {
+      try {
+        Class<?> interceptorClass =
+            conf.getClassByName(interceptorClassName);
+        if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+          RequestInterceptor interceptorInstance =
+              (RequestInterceptor) ReflectionUtils.newInstance(
+                  interceptorClass, conf);
+          if (pipeline == null) {
+            pipeline = interceptorInstance;
+            current = interceptorInstance;
+            continue;
+          } else {
+            current.setNextInterceptor(interceptorInstance);
+            current = interceptorInstance;
+          }
+        } else {
+          throw new YarnRuntimeException("Class: " + interceptorClassName
+              + " not instance of "
+              + RequestInterceptor.class.getCanonicalName());
+        }
+      } catch (ClassNotFoundException e) {
+        throw new YarnRuntimeException(
+            "Could not instantiate ApplicationMasterRequestInterceptor: "
+                + interceptorClassName, e);
+      }
+    }
+
+    if (pipeline == null) {
+      throw new YarnRuntimeException(
+          "RequestInterceptor pipeline is not configured in the system");
+    }
+    return pipeline;
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param conf
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration conf) {
+    String configuredInterceptorClassNames =
+        conf.get(
+            YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());
+    }
+
+    return interceptorClassNames;
+  }
+
+  /**
+   * Authorizes the request and returns the application specific request
+   * processing pipeline.
+   *
+   * @return the the intercepter wrapper instance
+   * @throws YarnException
+   */
+  private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
+      throws YarnException {
+    AMRMTokenIdentifier tokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    return getInterceptorChain(tokenIdentifier);
+  }
+
+  private RequestInterceptorChainWrapper getInterceptorChain(
+      AMRMTokenIdentifier tokenIdentifier) throws YarnException {
+    ApplicationAttemptId appAttemptId =
+        tokenIdentifier.getApplicationAttemptId();
+
+    synchronized (this.applPipelineMap) {
+      if (!this.applPipelineMap.containsKey(appAttemptId
+          .getApplicationId())) {
+        throw new YarnException(
+            "The AM request processing pipeline is not initialized for app: "
+                + appAttemptId.getApplicationId().toString());
+      }
+
+      return this.applPipelineMap.get(appAttemptId.getApplicationId());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Token<AMRMTokenIdentifier> getFirstAMRMToken(
+      Collection<Token<? extends TokenIdentifier>> allTokens) {
+    Iterator<Token<? extends TokenIdentifier>> iter = allTokens.iterator();
+    while (iter.hasNext()) {
+      Token<? extends TokenIdentifier> token = iter.next();
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        return (Token<AMRMTokenIdentifier>) token;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Private class for handling application stop events.
+   *
+   */
+  class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
+
+    @Override
+    public void handle(ApplicationEvent event) {
+      Application app =
+          AMRMProxyService.this.nmContext.getApplications().get(
+              event.getApplicationID());
+      if (app != null) {
+        switch (event.getType()) {
+        case FINISH_APPLICATION:
+          LOG.info("Application stop event received for stopping AppId:"
+              + event.getApplicationID().toString());
+          AMRMProxyService.this.stopApplication(event.getApplicationID());
+          break;
+        default:
+          LOG.debug("AMRMProxy is ignoring event: " + event.getType());
+          break;
+        }
+      } else {
+        LOG.warn("Event " + event + " sent to absent application "
+            + event.getApplicationID());
+      }
+    }
+  }
+
+  /**
+   * Private structure for encapsulating RequestInterceptor and
+   * ApplicationAttemptId instances.
+   *
+   */
+  private static class RequestInterceptorChainWrapper {
+    private RequestInterceptor rootInterceptor;
+    private ApplicationAttemptId applicationAttemptId;
+
+    /**
+     * Initializes the wrapper with the specified parameters.
+     * 
+     * @param rootInterceptor
+     * @param applicationAttemptId
+     */
+    public synchronized void init(RequestInterceptor rootInterceptor,
+        ApplicationAttemptId applicationAttemptId) {
+      this.rootInterceptor = rootInterceptor;
+      this.applicationAttemptId = applicationAttemptId;
+    }
+
+    /**
+     * Gets the root request intercepter.
+     * 
+     * @return the root request intercepter
+     */
+    public synchronized RequestInterceptor getRootInterceptor() {
+      return rootInterceptor;
+    }
+
+    /**
+     * Gets the application attempt identifier.
+     * 
+     * @return the application attempt identifier
+     */
+    public synchronized ApplicationAttemptId getApplicationAttemptId() {
+      return applicationAttemptId;
+    }
+
+    /**
+     * Gets the application identifier.
+     * 
+     * @return the application identifier
+     */
+    public synchronized ApplicationId getApplicationId() {
+      return applicationAttemptId.getApplicationId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
new file mode 100644
index 0000000..d09ce41
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This secret manager instance is used by the AMRMProxyService to generate and
+ * manage tokens.
+ */
+public class AMRMProxyTokenSecretManager extends
+    SecretManager<AMRMTokenIdentifier> {
+
+  private static final Log LOG = LogFactory
+      .getLog(AMRMProxyTokenSecretManager.class);
+
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private final Timer timer;
+  private final long rollingInterval;
+  private final long activationDelay;
+
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
+
+  /**
+   * Create an {@link AMRMProxyTokenSecretManager}.
+   */
+  public AMRMProxyTokenSecretManager(Configuration conf) {
+    this.timer = new Timer();
+    this.rollingInterval =
+        conf.getLong(
+            YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+            YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
+        + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 3 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
+  }
+
+  public void start() {
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+        rollingInterval);
+  }
+
+  public void stop() {
+    this.timer.cancel();
+  }
+
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for "
+          + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private class MasterKeyRoller extends TimerTask {
+    @Override
+    public void run() {
+      rollMasterKey();
+    }
+  }
+
+  @Private
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey()
+              .getMasterKey().getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
+          password, identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey.
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by
+   * RPC layer to validate a remote {@link AMRMTokenIdentifier}.
+   */
+  @Override
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for "
+            + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken(applicationAttemptId
+            + " not found in AMRMProxyTokenSecretManager.");
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+          .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+            this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+              .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+            this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Invalid AMRMToken from "
+          + applicationAttemptId);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   * Creates an empty TokenId to be used for de-serializing an
+   * {@link AMRMTokenIdentifier} by the RPC layer.
+   */
+  @Override
+  public AMRMTokenIdentifier createIdentifier() {
+    return new AMRMTokenIdentifier();
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+          .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
new file mode 100644
index 0000000..810dfa8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractRequestInterceptor implements
+    RequestInterceptor {
+  private Configuration conf;
+  private AMRMProxyApplicationContext appContext;
+  private RequestInterceptor nextInterceptor;
+
+  /**
+   * Sets the {@link RequestInterceptor} in the chain.
+   */
+  @Override
+  public void setNextInterceptor(RequestInterceptor nextInterceptor) {
+    this.nextInterceptor = nextInterceptor;
+  }
+
+  /**
+   * Sets the {@link Configuration}.
+   */
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.setConf(conf);
+    }
+  }
+
+  /**
+   * Gets the {@link Configuration}.
+   */
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Initializes the {@link RequestInterceptor}.
+   */
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    Preconditions.checkState(this.appContext == null,
+        "init is called multiple times on this interceptor: "
+            + this.getClass().getName());
+    this.appContext = appContext;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.init(appContext);
+    }
+  }
+
+  /**
+   * Disposes the {@link RequestInterceptor}.
+   */
+  @Override
+  public void shutdown() {
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.shutdown();
+    }
+  }
+
+  /**
+   * Gets the next {@link RequestInterceptor} in the chain.
+   */
+  @Override
+  public RequestInterceptor getNextInterceptor() {
+    return this.nextInterceptor;
+  }
+
+  /**
+   * Gets the {@link AMRMProxyApplicationContext}.
+   */
+  public AMRMProxyApplicationContext getApplicationContext() {
+    return this.appContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
new file mode 100644
index 0000000..2c7939b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor class and provides an implementation
+ * that simply forwards the AM requests to the cluster resource manager.
+ *
+ */
+public final class DefaultRequestInterceptor extends
+    AbstractRequestInterceptor {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DefaultRequestInterceptor.class);
+  private ApplicationMasterProtocol rmClient;
+  private UserGroupInformation user = null;
+
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    try {
+      user =
+          UserGroupInformation.createProxyUser(appContext
+              .getApplicationAttemptId().toString(), UserGroupInformation
+              .getCurrentUser());
+      user.addToken(appContext.getAMRMToken());
+      final Configuration conf = this.getConf();
+
+      rmClient =
+          user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+            @Override
+            public ApplicationMasterProtocol run() throws Exception {
+              return ClientRMProxy.createRMProxy(conf,
+                  ApplicationMasterProtocol.class);
+            }
+          });
+    } catch (IOException e) {
+      String message =
+          "Error while creating of RM app master service proxy for attemptId:"
+              + appContext.getApplicationAttemptId().toString();
+      if (user != null) {
+        message += ", user: " + user;
+      }
+
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      final RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    LOG.info("Forwarding registration request to the real YARN RM");
+    return rmClient.registerApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(final AllocateRequest request)
+      throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocate request to the real YARN RM");
+    }
+    AllocateResponse allocateResponse = rmClient.allocate(request);
+    if (allocateResponse.getAMRMToken() != null) {
+      updateAMRMToken(allocateResponse.getAMRMToken());
+    }
+
+    return allocateResponse;
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      final FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding finish application request to "
+        + "the real YARN Resource Manager");
+    return rmClient.finishApplicationMaster(request);
+  }
+
+  @Override
+  public void setNextInterceptor(RequestInterceptor next) {
+    throw new YarnRuntimeException(
+        "setNextInterceptor is being called on DefaultRequestInterceptor,"
+            + "which should be the last one in the chain "
+            + "Check if the interceptor pipeline configuration is correct");
+  }
+
+  private void updateAMRMToken(Token token) throws IOException {
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+            token.getIdentifier().array(), token.getPassword().array(),
+            new Text(token.getKind()), new Text(token.getService()));
+    // Preserve the token service sent by the RM when adding the token
+    // to ensure we replace the previous token setup by the RM.
+    // Afterwards we can update the service address for the RPC layer.
+    user.addToken(amrmToken);
+    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
new file mode 100644
index 0000000..c74c88f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the application
+ * master to the resource manager.
+ */
+public interface RequestInterceptor extends ApplicationMasterProtocol,
+    Configurable {
+  /**
+   * This method is called for initializing the intercepter. This is guaranteed
+   * to be called only once in the lifetime of this instance.
+   *
+   * @param ctx
+   */
+  void init(AMRMProxyApplicationContext ctx);
+
+  /**
+   * This method is called to release the resources held by the intercepter.
+   * This will be called when the application pipeline is being destroyed. The
+   * concrete implementations should dispose the resources and forward the
+   * request to the next intercepter, if any.
+   */
+  void shutdown();
+
+  /**
+   * Sets the next intercepter in the pipeline. The concrete implementation of
+   * this interface should always pass the request to the nextInterceptor after
+   * inspecting the message. The last intercepter in the chain is responsible to
+   * send the messages to the resource manager service and so the last
+   * intercepter will not receive this method call.
+   *
+   * @param nextInterceptor
+   */
+  void setNextInterceptor(RequestInterceptor nextInterceptor);
+
+  /**
+   * Returns the next intercepter in the chain.
+   * 
+   * @return the next intercepter in the chain
+   */
+  RequestInterceptor getNextInterceptor();
+
+  /**
+   * Returns the context.
+   * 
+   * @return the context
+   */
+  AMRMProxyApplicationContext getApplicationContext();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 68c7f2c..a658e53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
@@ -51,7 +50,6 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
@@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements
   private boolean serviceStopped = false;
   private final ReadLock readLock;
   private final WriteLock writeLock;
+  private AMRMProxyService amrmProxyService;
+  private boolean amrmProxyEnabled = false;
 
   private long waitForContainersOnShutdownMillis;
 
@@ -235,6 +238,20 @@ public class ContainerManagerImpl extends CompositeService implements
     addService(sharedCacheUploader);
     dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
 
+    amrmProxyEnabled =
+        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+    if (amrmProxyEnabled) {
+      LOG.info("AMRMProxyService is enabled. "
+          + "All the AM->RM requests will be intercepted by the proxy");
+      this.amrmProxyService =
+          new AMRMProxyService(this.context, this.dispatcher);
+      addService(this.amrmProxyService);
+    } else {
+      LOG.info("AMRMProxyService is disabled");
+    }
+
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
             YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -246,6 +263,10 @@ public class ContainerManagerImpl extends CompositeService implements
     recover();
   }
 
+  public boolean isARMRMProxyEnabled() {
+    return amrmProxyEnabled;
+  }
+
   @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
@@ -314,7 +335,8 @@ public class ContainerManagerImpl extends CompositeService implements
         + " with exit code " + rcs.getExitCode());
 
     if (context.getApplications().containsKey(appId)) {
-      Credentials credentials = parseCredentials(launchContext);
+      Credentials credentials =
+          YarnServerSecurityUtils.parseCredentials(launchContext);
       Container container = new ContainerImpl(getConfig(), dispatcher,
           context.getNMStateStore(), req.getContainerLaunchContext(),
           credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
@@ -737,8 +759,17 @@ public class ContainerManagerImpl extends CompositeService implements
         verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
           containerTokenIdentifier);
         containerId = containerTokenIdentifier.getContainerID();
-        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
-          request);
+
+        // Initialize the AMRMProxy service instance only if the container is of
+        // type AM and if the AMRMProxy service is enabled
+        if (isARMRMProxyEnabled()
+            && containerTokenIdentifier.getContainerType().equals(
+                ContainerType.APPLICATION_MASTER)) {
+          this.amrmProxyService.processApplicationStartRequest(request);
+        }
+
+        startContainerInternal(nmTokenIdentifier,
+            containerTokenIdentifier, request);
         succeededContainers.add(containerId);
       } catch (YarnException e) {
         failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -751,7 +782,7 @@ public class ContainerManagerImpl extends CompositeService implements
     }
 
     return StartContainersResponse.newInstance(getAuxServiceMetaData(),
-      succeededContainers, failedContainers);
+        succeededContainers, failedContainers);
   }
 
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@@ -844,7 +875,8 @@ public class ContainerManagerImpl extends CompositeService implements
       }
     }
 
-    Credentials credentials = parseCredentials(launchContext);
+    Credentials credentials =
+        YarnServerSecurityUtils.parseCredentials(launchContext);
 
     Container container =
         new ContainerImpl(getConfig(), this.dispatcher,
@@ -928,27 +960,6 @@ public class ContainerManagerImpl extends CompositeService implements
       nmTokenIdentifier);
   }
 
-  private Credentials parseCredentials(ContainerLaunchContext launchContext)
-      throws IOException {
-    Credentials credentials = new Credentials();
-    // //////////// Parse credentials
-    ByteBuffer tokens = launchContext.getTokens();
-
-    if (tokens != null) {
-      DataInputByteBuffer buf = new DataInputByteBuffer();
-      tokens.rewind();
-      buf.reset(tokens);
-      credentials.readTokenStorageStream(buf);
-      if (LOG.isDebugEnabled()) {
-        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
-          LOG.debug(tk.getService() + " = " + tk.toString());
-        }
-      }
-    }
-    // //////////// End of parsing credentials
-    return credentials;
-  }
-
   /**
    * Stop a list of containers running on this NodeManager.
    */