You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/09/08 05:23:48 UTC
[2/2] hadoop git commit: YARN-2884. Added a proxy service in NM to
proxy the the communication between AM and RM. Contributed by Kishore
Chaliparambil
YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f72f1e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f72f1e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f72f1e6
Branch: refs/heads/trunk
Commit: 6f72f1e6003ab11679bebeb96f27f1f62b3b3e02
Parents: 9b78e6e
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 8 09:35:46 2015 +0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Sep 8 09:35:46 2015 +0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 17 +
.../yarn/conf/TestYarnConfigurationFields.java | 2 +
.../src/main/resources/yarn-default.xml | 34 +
.../server/utils/YarnServerSecurityUtils.java | 142 ++++
.../amrmproxy/AMRMProxyApplicationContext.java | 70 ++
.../AMRMProxyApplicationContextImpl.java | 132 ++++
.../nodemanager/amrmproxy/AMRMProxyService.java | 592 ++++++++++++++++
.../amrmproxy/AMRMProxyTokenSecretManager.java | 265 ++++++++
.../amrmproxy/AbstractRequestInterceptor.java | 102 +++
.../amrmproxy/DefaultRequestInterceptor.java | 138 ++++
.../amrmproxy/RequestInterceptor.java | 71 ++
.../containermanager/ContainerManagerImpl.java | 67 +-
.../amrmproxy/BaseAMRMProxyTest.java | 677 +++++++++++++++++++
.../amrmproxy/MockRequestInterceptor.java | 65 ++
.../amrmproxy/MockResourceManagerFacade.java | 469 +++++++++++++
.../PassThroughRequestInterceptor.java | 58 ++
.../amrmproxy/TestAMRMProxyService.java | 484 +++++++++++++
.../ApplicationMasterService.java | 69 +-
19 files changed, 3366 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a162070..be1feb4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -187,6 +187,9 @@ Release 2.8.0 - UNRELEASED
YARN-3970. Add REST api support for Application Priority.
(Naganarasimha G R via vvasudev)
+ YARN-2884. Added a proxy service in NM to proxy the the communication
+ between AM and RM. (Kishore Chaliparambil via jianhe)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4879ca1..9ec25ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1332,6 +1332,23 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
+ public static final String AMRM_PROXY_ENABLED = NM_PREFIX
+ + "amrmproxy.enable";
+ public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+ public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ + "amrmproxy.address";
+ public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
+ public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+ + DEFAULT_AMRM_PROXY_PORT;
+ public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+ + "amrmproxy.client.thread-count";
+ public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+ public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+ NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
+ public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+ "org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+ + "DefaultRequestInterceptor";
+
/**
* Default platform-agnostic CLASSPATH for YARN applications. A
* comma-separated list of CLASSPATH entries. The parameter expansion marker
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index e89a90d..97fcfa1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -86,6 +86,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
// Ignore all YARN Application Timeline Service (version 1) properties
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 59bfb56..b76defb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2259,4 +2259,38 @@
<value></value>
</property>
+ <property>
+ <description>
+ Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
+ calls from the application masters to the resource manager.
+ </description>
+ <name>yarn.nodemanager.amrmproxy.enable</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>
+ The address of the AMRMProxyService listener.
+ </description>
+ <name>yarn.nodemanager.amrmproxy.address</name>
+ <value>0.0.0.0:8048</value>
+ </property>
+
+ <property>
+ <description>
+ The number of threads used to handle requests by the AMRMProxyService.
+ </description>
+ <name>yarn.nodemanager.amrmproxy.client.thread-count</name>
+ <value>25</value>
+ </property>
+
+ <property>
+ <description>
+ The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
+ AMRMProxyService to create the request processing pipeline for applications.
+ </description>
+ <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
+ <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
new file mode 100644
index 0000000..9af556e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that contains commonly used server methods.
+ *
+ */
+@Private
+public final class YarnServerSecurityUtils {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(YarnServerSecurityUtils.class);
+
+ private YarnServerSecurityUtils() {
+ }
+
+ /**
+ * Authorizes the current request and returns the AMRMTokenIdentifier for the
+ * current application.
+ *
+ * @return the AMRMTokenIdentifier instance for the current user
+ * @throws YarnException
+ */
+ public static AMRMTokenIdentifier authorizeRequest()
+ throws YarnException {
+
+ UserGroupInformation remoteUgi;
+ try {
+ remoteUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ String msg =
+ "Cannot obtain the user-name for authorizing ApplicationMaster. "
+ + "Got exception: " + StringUtils.stringifyException(e);
+ LOG.warn(msg);
+ throw RPCUtil.getRemoteException(msg);
+ }
+
+ boolean tokenFound = false;
+ String message = "";
+ AMRMTokenIdentifier appTokenIdentifier = null;
+ try {
+ appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+ if (appTokenIdentifier == null) {
+ tokenFound = false;
+ message = "No AMRMToken found for user " + remoteUgi.getUserName();
+ } else {
+ tokenFound = true;
+ }
+ } catch (IOException e) {
+ tokenFound = false;
+ message =
+ "Got exception while looking for AMRMToken for user "
+ + remoteUgi.getUserName();
+ }
+
+ if (!tokenFound) {
+ LOG.warn(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ return appTokenIdentifier;
+ }
+
+ // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+ // currently sets only the required id, but iterate through anyways just to be
+ // sure.
+ private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
+ UserGroupInformation remoteUgi) throws IOException {
+ AMRMTokenIdentifier result = null;
+ Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+ for (TokenIdentifier tokenId : tokenIds) {
+ if (tokenId instanceof AMRMTokenIdentifier) {
+ result = (AMRMTokenIdentifier) tokenId;
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Parses the container launch context and returns a Credential instance that
+ * contains all the tokens from the launch context.
+ * @param launchContext
+ * @return the credential instance
+ * @throws IOException
+ */
+ public static Credentials parseCredentials(
+ ContainerLaunchContext launchContext) throws IOException {
+ Credentials credentials = new Credentials();
+ ByteBuffer tokens = launchContext.getTokens();
+
+ if (tokens != null) {
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ tokens.rewind();
+ buf.reset(tokens);
+ credentials.readTokenStorageStream(buf);
+ if (LOG.isDebugEnabled()) {
+ for (Token<? extends TokenIdentifier> tk : credentials
+ .getAllTokens()) {
+ LOG.debug(tk.getService() + " = " + tk.toString());
+ }
+ }
+ }
+
+ return credentials;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
new file mode 100644
index 0000000..c355a8b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Interface that can be used by the intercepter plugins to get the information
+ * about one application.
+ *
+ */
+public interface AMRMProxyApplicationContext {
+
+ /**
+ * Gets the configuration object instance.
+ * @return the configuration object.
+ */
+ Configuration getConf();
+
+ /**
+ * Gets the application attempt identifier.
+ * @return the application attempt identifier.
+ */
+ ApplicationAttemptId getApplicationAttemptId();
+
+ /**
+ * Gets the application submitter.
+ * @return the application submitter
+ */
+ String getUser();
+
+ /**
+ * Gets the application's AMRMToken that is issued by the RM.
+ * @return the application's AMRMToken that is issued by the RM.
+ */
+ Token<AMRMTokenIdentifier> getAMRMToken();
+
+ /**
+ * Gets the application's local AMRMToken issued by the proxy service.
+ * @return the application's local AMRMToken issued by the proxy service.
+ */
+ Token<AMRMTokenIdentifier> getLocalAMRMToken();
+
+ /**
+ * Gets the NMContext object.
+ * @return the NMContext.
+ */
+ Context getNMCotext();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
new file mode 100644
index 0000000..2e5aa94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Encapsulates the information about one application that is needed by the
+ * request intercepters.
+ *
+ */
+public class AMRMProxyApplicationContextImpl implements
+ AMRMProxyApplicationContext {
+ private final Configuration conf;
+ private final Context nmContext;
+ private final ApplicationAttemptId applicationAttemptId;
+ private final String user;
+ private Integer localTokenKeyId;
+ private Token<AMRMTokenIdentifier> amrmToken;
+ private Token<AMRMTokenIdentifier> localToken;
+
+ /**
+ * Create an instance of the AMRMProxyApplicationContext.
+ *
+ * @param nmContext
+ * @param conf
+ * @param applicationAttemptId
+ * @param user
+ * @param amrmToken
+ */
+ public AMRMProxyApplicationContextImpl(Context nmContext,
+ Configuration conf, ApplicationAttemptId applicationAttemptId,
+ String user, Token<AMRMTokenIdentifier> amrmToken,
+ Token<AMRMTokenIdentifier> localToken) {
+ this.nmContext = nmContext;
+ this.conf = conf;
+ this.applicationAttemptId = applicationAttemptId;
+ this.user = user;
+ this.amrmToken = amrmToken;
+ this.localToken = localToken;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public synchronized Token<AMRMTokenIdentifier> getAMRMToken() {
+ return amrmToken;
+ }
+
+ /**
+ * Sets the application's AMRMToken.
+ */
+ public synchronized void setAMRMToken(
+ Token<AMRMTokenIdentifier> amrmToken) {
+ this.amrmToken = amrmToken;
+ }
+
+ @Override
+ public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() {
+ return this.localToken;
+ }
+
+ /**
+ * Sets the application's AMRMToken.
+ */
+ public synchronized void setLocalAMRMToken(
+ Token<AMRMTokenIdentifier> localToken) {
+ this.localToken = localToken;
+ this.localTokenKeyId = null;
+ }
+
+ @Private
+ public synchronized int getLocalAMRMTokenKeyId() {
+ Integer keyId = this.localTokenKeyId;
+ if (keyId == null) {
+ try {
+ if (this.localToken == null) {
+ throw new YarnRuntimeException("Missing AMRM token for "
+ + this.applicationAttemptId);
+ }
+ keyId = this.amrmToken.decodeIdentifier().getKeyId();
+ this.localTokenKeyId = keyId;
+ } catch (IOException e) {
+ throw new YarnRuntimeException("AMRM token decode error for "
+ + this.applicationAttemptId, e);
+ }
+ }
+ return keyId;
+ }
+
+ @Override
+ public Context getNMCotext() {
+ return nmContext;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
new file mode 100644
index 0000000..bd6538c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -0,0 +1,592 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AMRMProxyService is a service that runs on each node manager that can be used
+ * to intercept and inspect messages from application master to the cluster
+ * resource manager. It listens to messages from the application master and
+ * creates a request intercepting pipeline instance for each application. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed.
+ */
+public class AMRMProxyService extends AbstractService implements
+ ApplicationMasterProtocol {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AMRMProxyService.class);
+ private Server server;
+ private final Context nmContext;
+ private final AsyncDispatcher dispatcher;
+ private InetSocketAddress listenerEndpoint;
+ private AMRMProxyTokenSecretManager secretManager;
+ private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
+
+ /**
+ * Creates an instance of the service.
+ *
+ * @param nmContext
+ * @param dispatcher
+ */
+ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
+ super(AMRMProxyService.class.getName());
+ Preconditions.checkArgument(nmContext != null, "nmContext is null");
+ Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
+ this.nmContext = nmContext;
+ this.dispatcher = dispatcher;
+ this.applPipelineMap =
+ new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
+
+ this.dispatcher.register(ApplicationEventType.class,
+ new ApplicationEventHandler());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting AMRMProxyService");
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ this.listenerEndpoint =
+ conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_PORT);
+
+ Configuration serverConf = new Configuration(conf);
+ serverConf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ SaslRpcServer.AuthMethod.TOKEN.toString());
+
+ int numWorkerThreads =
+ serverConf.getInt(
+ YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
+
+ this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
+ this.secretManager.start();
+
+ this.server =
+ rpc.getServer(ApplicationMasterProtocol.class, this,
+ listenerEndpoint, serverConf, this.secretManager,
+ numWorkerThreads);
+
+ this.server.start();
+ LOG.info("AMRMProxyService listening on address: "
+ + this.server.getListenerAddress());
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping AMRMProxyService");
+ if (this.server != null) {
+ this.server.stop();
+ }
+
+ this.secretManager.stop();
+
+ super.serviceStop();
+ }
+
+ /**
+ * This is called by the AMs started on this node to register with the RM.
+ * This method does the initial authorization and then forwards the request to
+ * the application instance specific intercepter chain.
+ */
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Registering application master." + " Host:"
+ + request.getHost() + " Port:" + request.getRpcPort()
+ + " Tracking Url:" + request.getTrackingUrl());
+ RequestInterceptorChainWrapper pipeline =
+ authorizeAndGetInterceptorChain();
+ return pipeline.getRootInterceptor()
+ .registerApplicationMaster(request);
+ }
+
+ /**
+ * This is called by the AMs started on this node to unregister from the RM.
+ * This method does the initial authorization and then forwards the request to
+ * the application instance specific intercepter chain.
+ */
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Finishing application master. Tracking Url:"
+ + request.getTrackingUrl());
+ RequestInterceptorChainWrapper pipeline =
+ authorizeAndGetInterceptorChain();
+ return pipeline.getRootInterceptor().finishApplicationMaster(request);
+ }
+
+ /**
+ * This is called by the AMs started on this node to send heart beat to RM.
+ * This method does the initial authorization and then forwards the request to
+ * the application instance specific pipeline, which is a chain of request
+ * intercepter objects. One application request processing pipeline is created
+ * per AM instance.
+ */
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
+ RequestInterceptorChainWrapper pipeline =
+ getInterceptorChain(amrmTokenIdentifier);
+ AllocateResponse allocateResponse =
+ pipeline.getRootInterceptor().allocate(request);
+
+ updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
+
+ return allocateResponse;
+ }
+
+ /**
+ * Callback from the ContainerManager implementation for initializing the
+ * application request processing pipeline.
+ *
+ * @param request - encapsulates information for starting an AM
+ * @throws IOException
+ * @throws YarnException
+ */
+ public void processApplicationStartRequest(StartContainerRequest request)
+ throws IOException, YarnException {
+ LOG.info("Callback received for initializing request "
+ + "processing pipeline for an AM");
+ ContainerTokenIdentifier containerTokenIdentifierForKey =
+ BuilderUtils.newContainerTokenIdentifier(request
+ .getContainerToken());
+ ApplicationAttemptId appAttemptId =
+ containerTokenIdentifierForKey.getContainerID()
+ .getApplicationAttemptId();
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(request
+ .getContainerLaunchContext());
+
+ Token<AMRMTokenIdentifier> amrmToken =
+ getFirstAMRMToken(credentials.getAllTokens());
+ if (amrmToken == null) {
+ throw new YarnRuntimeException(
+ "AMRMToken not found in the start container request for application:"
+ + appAttemptId.toString());
+ }
+
+ // Substitute the existing AMRM Token with a local one. Keep the rest of the
+ // tokens in the credentials intact.
+ Token<AMRMTokenIdentifier> localToken =
+ this.secretManager.createAndGetAMRMToken(appAttemptId);
+ credentials.addToken(localToken.getService(), localToken);
+
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ request.getContainerLaunchContext().setTokens(
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+ initializePipeline(containerTokenIdentifierForKey.getContainerID()
+ .getApplicationAttemptId(),
+ containerTokenIdentifierForKey.getApplicationSubmitter(),
+ amrmToken, localToken);
+ }
+
+ /**
+ * Initializes the request intercepter pipeline for the specified application.
+ *
+ * @param applicationAttemptId
+ * @param user
+ * @param amrmToken
+ */
+ protected void initializePipeline(
+ ApplicationAttemptId applicationAttemptId, String user,
+ Token<AMRMTokenIdentifier> amrmToken,
+ Token<AMRMTokenIdentifier> localToken) {
+ RequestInterceptorChainWrapper chainWrapper = null;
+ synchronized (applPipelineMap) {
+ if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
+ LOG.warn("Request to start an already existing appId was received. "
+ + " This can happen if an application failed and a new attempt "
+ + "was created on this machine. ApplicationId: "
+ + applicationAttemptId.toString());
+ return;
+ }
+
+ chainWrapper = new RequestInterceptorChainWrapper();
+ this.applPipelineMap.put(applicationAttemptId.getApplicationId(),
+ chainWrapper);
+ }
+
+ // We register the pipeline instance in the map first and then initialize it
+ // later because chain initialization can be expensive and we would like to
+ // release the lock as soon as possible to prevent other applications from
+ // blocking when one application's chain is initializing
+ LOG.info("Initializing request processing pipeline for application. "
+ + " ApplicationId:" + applicationAttemptId + " for the user: "
+ + user);
+
+ RequestInterceptor interceptorChain =
+ this.createRequestInterceptorChain();
+ interceptorChain.init(createApplicationMasterContext(
+ applicationAttemptId, user, amrmToken, localToken));
+ chainWrapper.init(interceptorChain, applicationAttemptId);
+ }
+
+ /**
+ * Shuts down the request processing pipeline for the specified application
+ * attempt id.
+ *
+ * @param applicationId
+ */
+ protected void stopApplication(ApplicationId applicationId) {
+ Preconditions.checkArgument(applicationId != null,
+ "applicationId is null");
+ RequestInterceptorChainWrapper pipeline =
+ this.applPipelineMap.remove(applicationId);
+
+ if (pipeline == null) {
+ LOG.info("Request to stop an application that does not exist. Id:"
+ + applicationId);
+ } else {
+ LOG.info("Stopping the request processing pipeline for application: "
+ + applicationId);
+ try {
+ pipeline.getRootInterceptor().shutdown();
+ } catch (Throwable ex) {
+ LOG.warn(
+ "Failed to shutdown the request processing pipeline for app:"
+ + applicationId, ex);
+ }
+ }
+ }
+
+ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
+ RequestInterceptorChainWrapper pipeline,
+ AllocateResponse allocateResponse) {
+ AMRMProxyApplicationContextImpl context =
+ (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
+ .getApplicationContext();
+
+ // check to see if the RM has issued a new AMRMToken & accordingly update
+ // the real ARMRMToken in the current context
+ if (allocateResponse.getAMRMToken() != null) {
+ org.apache.hadoop.yarn.api.records.Token token =
+ allocateResponse.getAMRMToken();
+
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
+ new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+ token.getIdentifier().array(), token.getPassword().array(),
+ new Text(token.getKind()), new Text(token.getService()));
+
+ context.setAMRMToken(newTokenId);
+ }
+
+ // Check if the local AMRMToken is rolled up and update the context and
+ // response accordingly
+ MasterKeyData nextMasterKey =
+ this.secretManager.getNextMasterKeyData();
+
+ if (nextMasterKey != null
+ && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+ .getKeyId()) {
+ Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
+ if (nextMasterKey.getMasterKey().getKeyId() != context
+ .getLocalAMRMTokenKeyId()) {
+ LOG.info("The local AMRMToken has been rolled-over."
+ + " Send new local AMRMToken back to application: "
+ + pipeline.getApplicationId());
+ localToken =
+ this.secretManager.createAndGetAMRMToken(pipeline
+ .getApplicationAttemptId());
+ context.setLocalAMRMToken(localToken);
+ }
+
+ allocateResponse
+ .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+ .newInstance(localToken.getIdentifier(), localToken
+ .getKind().toString(), localToken.getPassword(),
+ localToken.getService().toString()));
+ }
+ }
+
+ private AMRMProxyApplicationContext createApplicationMasterContext(
+ ApplicationAttemptId applicationAttemptId, String user,
+ Token<AMRMTokenIdentifier> amrmToken,
+ Token<AMRMTokenIdentifier> localToken) {
+ AMRMProxyApplicationContextImpl appContext =
+ new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
+ applicationAttemptId, user, amrmToken, localToken);
+ return appContext;
+ }
+
+ /**
+ * Gets the Request intercepter chains for all the applications.
+ *
+ * @return the request intercepter chains.
+ */
+ protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
+ return this.applPipelineMap;
+ }
+
+ /**
+ * This method creates and returns reference of the first intercepter in the
+ * chain of request intercepter instances.
+ *
+ * @return the reference of the first intercepter in the chain
+ */
+ protected RequestInterceptor createRequestInterceptorChain() {
+ Configuration conf = getConfig();
+
+ List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+ RequestInterceptor pipeline = null;
+ RequestInterceptor current = null;
+ for (String interceptorClassName : interceptorClassNames) {
+ try {
+ Class<?> interceptorClass =
+ conf.getClassByName(interceptorClassName);
+ if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+ RequestInterceptor interceptorInstance =
+ (RequestInterceptor) ReflectionUtils.newInstance(
+ interceptorClass, conf);
+ if (pipeline == null) {
+ pipeline = interceptorInstance;
+ current = interceptorInstance;
+ continue;
+ } else {
+ current.setNextInterceptor(interceptorInstance);
+ current = interceptorInstance;
+ }
+ } else {
+ throw new YarnRuntimeException("Class: " + interceptorClassName
+ + " not instance of "
+ + RequestInterceptor.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate ApplicationMasterRequestInterceptor: "
+ + interceptorClassName, e);
+ }
+ }
+
+ if (pipeline == null) {
+ throw new YarnRuntimeException(
+ "RequestInterceptor pipeline is not configured in the system");
+ }
+ return pipeline;
+ }
+
+ /**
+ * Returns the comma separated intercepter class names from the configuration.
+ *
+ * @param conf
+ * @return the intercepter class names as an instance of ArrayList
+ */
+ private List<String> getInterceptorClassNames(Configuration conf) {
+ String configuredInterceptorClassNames =
+ conf.get(
+ YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+
+ List<String> interceptorClassNames = new ArrayList<String>();
+ Collection<String> tempList =
+ StringUtils.getStringCollection(configuredInterceptorClassNames);
+ for (String item : tempList) {
+ interceptorClassNames.add(item.trim());
+ }
+
+ return interceptorClassNames;
+ }
+
+ /**
+ * Authorizes the request and returns the application specific request
+ * processing pipeline.
+ *
+ * @return the the intercepter wrapper instance
+ * @throws YarnException
+ */
+ private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
+ throws YarnException {
+ AMRMTokenIdentifier tokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
+ return getInterceptorChain(tokenIdentifier);
+ }
+
+ private RequestInterceptorChainWrapper getInterceptorChain(
+ AMRMTokenIdentifier tokenIdentifier) throws YarnException {
+ ApplicationAttemptId appAttemptId =
+ tokenIdentifier.getApplicationAttemptId();
+
+ synchronized (this.applPipelineMap) {
+ if (!this.applPipelineMap.containsKey(appAttemptId
+ .getApplicationId())) {
+ throw new YarnException(
+ "The AM request processing pipeline is not initialized for app: "
+ + appAttemptId.getApplicationId().toString());
+ }
+
+ return this.applPipelineMap.get(appAttemptId.getApplicationId());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Token<AMRMTokenIdentifier> getFirstAMRMToken(
+ Collection<Token<? extends TokenIdentifier>> allTokens) {
+ Iterator<Token<? extends TokenIdentifier>> iter = allTokens.iterator();
+ while (iter.hasNext()) {
+ Token<? extends TokenIdentifier> token = iter.next();
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ return (Token<AMRMTokenIdentifier>) token;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Private class for handling application stop events.
+ *
+ */
+ class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
+
+ @Override
+ public void handle(ApplicationEvent event) {
+ Application app =
+ AMRMProxyService.this.nmContext.getApplications().get(
+ event.getApplicationID());
+ if (app != null) {
+ switch (event.getType()) {
+ case FINISH_APPLICATION:
+ LOG.info("Application stop event received for stopping AppId:"
+ + event.getApplicationID().toString());
+ AMRMProxyService.this.stopApplication(event.getApplicationID());
+ break;
+ default:
+ LOG.debug("AMRMProxy is ignoring event: " + event.getType());
+ break;
+ }
+ } else {
+ LOG.warn("Event " + event + " sent to absent application "
+ + event.getApplicationID());
+ }
+ }
+ }
+
+ /**
+ * Private structure for encapsulating RequestInterceptor and
+ * ApplicationAttemptId instances.
+ *
+ */
+ private static class RequestInterceptorChainWrapper {
+ private RequestInterceptor rootInterceptor;
+ private ApplicationAttemptId applicationAttemptId;
+
+ /**
+ * Initializes the wrapper with the specified parameters.
+ *
+ * @param rootInterceptor
+ * @param applicationAttemptId
+ */
+ public synchronized void init(RequestInterceptor rootInterceptor,
+ ApplicationAttemptId applicationAttemptId) {
+ this.rootInterceptor = rootInterceptor;
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ /**
+ * Gets the root request intercepter.
+ *
+ * @return the root request intercepter
+ */
+ public synchronized RequestInterceptor getRootInterceptor() {
+ return rootInterceptor;
+ }
+
+ /**
+ * Gets the application attempt identifier.
+ *
+ * @return the application attempt identifier
+ */
+ public synchronized ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ /**
+ * Gets the application identifier.
+ *
+ * @return the application identifier
+ */
+ public synchronized ApplicationId getApplicationId() {
+ return applicationAttemptId.getApplicationId();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
new file mode 100644
index 0000000..d09ce41
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This secret manager instance is used by the AMRMProxyService to generate and
+ * manage tokens.
+ */
+public class AMRMProxyTokenSecretManager extends
+ SecretManager<AMRMTokenIdentifier> {
+
+ private static final Log LOG = LogFactory
+ .getLog(AMRMProxyTokenSecretManager.class);
+
+ private int serialNo = new SecureRandom().nextInt();
+ private MasterKeyData nextMasterKey;
+ private MasterKeyData currentMasterKey;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ private final Timer timer;
+ private final long rollingInterval;
+ private final long activationDelay;
+
+ private final Set<ApplicationAttemptId> appAttemptSet =
+ new HashSet<ApplicationAttemptId>();
+
+ /**
+ * Create an {@link AMRMProxyTokenSecretManager}.
+ */
+ public AMRMProxyTokenSecretManager(Configuration conf) {
+ this.timer = new Timer();
+ this.rollingInterval =
+ conf.getLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+ // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+ // the updated shared-key.
+ this.activationDelay =
+ (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+ LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+ + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
+ + " ms");
+ if (rollingInterval <= activationDelay * 2) {
+ throw new IllegalArgumentException(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+ + " should be more than 3 X "
+ + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+ }
+ }
+
+ public void start() {
+ if (this.currentMasterKey == null) {
+ this.currentMasterKey = createNewMasterKey();
+ }
+ this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+ rollingInterval);
+ }
+
+ public void stop() {
+ this.timer.cancel();
+ }
+
+ public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Application finished, removing password for "
+ + appAttemptId);
+ this.appAttemptSet.remove(appAttemptId);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private class MasterKeyRoller extends TimerTask {
+ @Override
+ public void run() {
+ rollMasterKey();
+ }
+ }
+
+ @Private
+ void rollMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Rolling master-key for amrm-tokens");
+ this.nextMasterKey = createNewMasterKey();
+ this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private class NextKeyActivator extends TimerTask {
+ @Override
+ public void run() {
+ activateNextMasterKey();
+ }
+ }
+
+ public void activateNextMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Activating next master key with id: "
+ + this.nextMasterKey.getMasterKey().getKeyId());
+ this.currentMasterKey = this.nextMasterKey;
+ this.nextMasterKey = null;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData createNewMasterKey() {
+ this.writeLock.lock();
+ try {
+ return new MasterKeyData(serialNo++, generateSecret());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+ ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+ AMRMTokenIdentifier identifier =
+ new AMRMTokenIdentifier(appAttemptId, getMasterKey()
+ .getMasterKey().getKeyId());
+ byte[] password = this.createPassword(identifier);
+ appAttemptSet.add(appAttemptId);
+ return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
+ password, identifier.getKind(), new Text());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ // If nextMasterKey is not Null, then return nextMasterKey
+ // otherwise return currentMasterKey.
+ @VisibleForTesting
+ public MasterKeyData getMasterKey() {
+ this.readLock.lock();
+ try {
+ return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by
+ * RPC layer to validate a remote {@link AMRMTokenIdentifier}.
+ */
+ @Override
+ public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+ throws InvalidToken {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to retrieve password for "
+ + applicationAttemptId);
+ }
+ if (!appAttemptSet.contains(applicationAttemptId)) {
+ throw new InvalidToken(applicationAttemptId
+ + " not found in AMRMProxyTokenSecretManager.");
+ }
+ if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.currentMasterKey.getSecretKey());
+ } else if (nextMasterKey != null
+ && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.nextMasterKey.getSecretKey());
+ }
+ throw new InvalidToken("Invalid AMRMToken from "
+ + applicationAttemptId);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Creates an empty TokenId to be used for de-serializing an
+ * {@link AMRMTokenIdentifier} by the RPC layer.
+ */
+ @Override
+ public AMRMTokenIdentifier createIdentifier() {
+ return new AMRMTokenIdentifier();
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData getNextMasterKeyData() {
+ this.readLock.lock();
+ try {
+ return this.nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ @Private
+ protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ LOG.info("Creating password for " + applicationAttemptId);
+ return createPassword(identifier.getBytes(), getMasterKey()
+ .getSecretKey());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
new file mode 100644
index 0000000..810dfa8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractRequestInterceptor implements
+ RequestInterceptor {
+ private Configuration conf;
+ private AMRMProxyApplicationContext appContext;
+ private RequestInterceptor nextInterceptor;
+
+ /**
+ * Sets the {@link RequestInterceptor} in the chain.
+ */
+ @Override
+ public void setNextInterceptor(RequestInterceptor nextInterceptor) {
+ this.nextInterceptor = nextInterceptor;
+ }
+
+ /**
+ * Sets the {@link Configuration}.
+ */
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.setConf(conf);
+ }
+ }
+
+ /**
+ * Gets the {@link Configuration}.
+ */
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Initializes the {@link RequestInterceptor}.
+ */
+ @Override
+ public void init(AMRMProxyApplicationContext appContext) {
+ Preconditions.checkState(this.appContext == null,
+ "init is called multiple times on this interceptor: "
+ + this.getClass().getName());
+ this.appContext = appContext;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.init(appContext);
+ }
+ }
+
+ /**
+ * Disposes the {@link RequestInterceptor}.
+ */
+ @Override
+ public void shutdown() {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.shutdown();
+ }
+ }
+
+ /**
+ * Gets the next {@link RequestInterceptor} in the chain.
+ */
+ @Override
+ public RequestInterceptor getNextInterceptor() {
+ return this.nextInterceptor;
+ }
+
+ /**
+ * Gets the {@link AMRMProxyApplicationContext}.
+ */
+ public AMRMProxyApplicationContext getApplicationContext() {
+ return this.appContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
new file mode 100644
index 0000000..2c7939b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor class and provides an implementation
+ * that simply forwards the AM requests to the cluster resource manager.
+ *
+ */
+public final class DefaultRequestInterceptor extends
+ AbstractRequestInterceptor {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DefaultRequestInterceptor.class);
+ private ApplicationMasterProtocol rmClient;
+ private UserGroupInformation user = null;
+
+ @Override
+ public void init(AMRMProxyApplicationContext appContext) {
+ super.init(appContext);
+ try {
+ user =
+ UserGroupInformation.createProxyUser(appContext
+ .getApplicationAttemptId().toString(), UserGroupInformation
+ .getCurrentUser());
+ user.addToken(appContext.getAMRMToken());
+ final Configuration conf = this.getConf();
+
+ rmClient =
+ user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+ @Override
+ public ApplicationMasterProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(conf,
+ ApplicationMasterProtocol.class);
+ }
+ });
+ } catch (IOException e) {
+ String message =
+ "Error while creating of RM app master service proxy for attemptId:"
+ + appContext.getApplicationAttemptId().toString();
+ if (user != null) {
+ message += ", user: " + user;
+ }
+
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ final RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ LOG.info("Forwarding registration request to the real YARN RM");
+ return rmClient.registerApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(final AllocateRequest request)
+ throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Forwarding allocate request to the real YARN RM");
+ }
+ AllocateResponse allocateResponse = rmClient.allocate(request);
+ if (allocateResponse.getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAMRMToken());
+ }
+
+ return allocateResponse;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ final FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Forwarding finish application request to "
+ + "the real YARN Resource Manager");
+ return rmClient.finishApplicationMaster(request);
+ }
+
+ @Override
+ public void setNextInterceptor(RequestInterceptor next) {
+ throw new YarnRuntimeException(
+ "setNextInterceptor is being called on DefaultRequestInterceptor,"
+ + "which should be the last one in the chain "
+ + "Check if the interceptor pipeline configuration is correct");
+ }
+
+ private void updateAMRMToken(Token token) throws IOException {
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+ new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+ token.getIdentifier().array(), token.getPassword().array(),
+ new Text(token.getKind()), new Text(token.getService()));
+ // Preserve the token service sent by the RM when adding the token
+ // to ensure we replace the previous token setup by the RM.
+ // Afterwards we can update the service address for the RPC layer.
+ user.addToken(amrmToken);
+ amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
new file mode 100644
index 0000000..c74c88f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the application
+ * master to the resource manager.
+ */
+public interface RequestInterceptor extends ApplicationMasterProtocol,
+ Configurable {
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param ctx
+ */
+ void init(AMRMProxyApplicationContext ctx);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor
+ */
+ void setNextInterceptor(RequestInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ RequestInterceptor getNextInterceptor();
+
+ /**
+ * Returns the context.
+ *
+ * @return the context
+ */
+ AMRMProxyApplicationContext getApplicationContext();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 68c7f2c..a658e53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@@ -51,7 +50,6 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
@@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
+ private AMRMProxyService amrmProxyService;
+ private boolean amrmProxyEnabled = false;
private long waitForContainersOnShutdownMillis;
@@ -235,6 +238,20 @@ public class ContainerManagerImpl extends CompositeService implements
addService(sharedCacheUploader);
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
+ amrmProxyEnabled =
+ conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+ if (amrmProxyEnabled) {
+ LOG.info("AMRMProxyService is enabled. "
+ + "All the AM->RM requests will be intercepted by the proxy");
+ this.amrmProxyService =
+ new AMRMProxyService(this.context, this.dispatcher);
+ addService(this.amrmProxyService);
+ } else {
+ LOG.info("AMRMProxyService is disabled");
+ }
+
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -246,6 +263,10 @@ public class ContainerManagerImpl extends CompositeService implements
recover();
}
+ public boolean isARMRMProxyEnabled() {
+ return amrmProxyEnabled;
+ }
+
@SuppressWarnings("unchecked")
private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore();
@@ -314,7 +335,8 @@ public class ContainerManagerImpl extends CompositeService implements
+ " with exit code " + rcs.getExitCode());
if (context.getApplications().containsKey(appId)) {
- Credentials credentials = parseCredentials(launchContext);
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(launchContext);
Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
@@ -737,8 +759,17 @@ public class ContainerManagerImpl extends CompositeService implements
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
- startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
- request);
+
+ // Initialize the AMRMProxy service instance only if the container is of
+ // type AM and if the AMRMProxy service is enabled
+ if (isARMRMProxyEnabled()
+ && containerTokenIdentifier.getContainerType().equals(
+ ContainerType.APPLICATION_MASTER)) {
+ this.amrmProxyService.processApplicationStartRequest(request);
+ }
+
+ startContainerInternal(nmTokenIdentifier,
+ containerTokenIdentifier, request);
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -751,7 +782,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
return StartContainersResponse.newInstance(getAuxServiceMetaData(),
- succeededContainers, failedContainers);
+ succeededContainers, failedContainers);
}
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@@ -844,7 +875,8 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
- Credentials credentials = parseCredentials(launchContext);
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(launchContext);
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
@@ -928,27 +960,6 @@ public class ContainerManagerImpl extends CompositeService implements
nmTokenIdentifier);
}
- private Credentials parseCredentials(ContainerLaunchContext launchContext)
- throws IOException {
- Credentials credentials = new Credentials();
- // //////////// Parse credentials
- ByteBuffer tokens = launchContext.getTokens();
-
- if (tokens != null) {
- DataInputByteBuffer buf = new DataInputByteBuffer();
- tokens.rewind();
- buf.reset(tokens);
- credentials.readTokenStorageStream(buf);
- if (LOG.isDebugEnabled()) {
- for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
- LOG.debug(tk.getService() + " = " + tk.toString());
- }
- }
- }
- // //////////// End of parsing credentials
- return credentials;
- }
-
/**
* Stop a list of containers running on this NodeManager.
*/