You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ss...@apache.org on 2013/02/06 20:06:22 UTC
svn commit: r1443134 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/
hadoop-yarn/had...
Author: sseth
Date: Wed Feb 6 19:06:22 2013
New Revision: 1443134
URL: http://svn.apache.org/viewvc?rev=1443134&view=rev
Log:
merge YARN-355 from trunk. Fixes a bug where RM app submission could jam under load. Contributed by Daryn Sharp.
Removed:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1443134&r1=1443133&r2=1443134&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 6 19:06:22 2013
@@ -216,6 +216,9 @@ Release 2.0.3-alpha - 2013-02-06
YARN-370. Fix SchedulerUtils to correctly round up the resource for
containers. (Zhijie Shen via acmurthy)
+ YARN-355. Fixes a bug where RM app submission could jam under load.
+ (Daryn Sharp via sseth)
+
Release 2.0.2-alpha - 2012-09-07
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java?rev=1443134&r1=1443133&r2=1443134&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java Wed Feb 6 19:06:22 2013
@@ -25,13 +25,11 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -47,8 +45,6 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -199,30 +195,6 @@ public class YarnClientImpl extends Abst
}
- // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
- // are part of ClientRMProtocol.
- @Private
- public long renewRMDelegationToken(DelegationToken rmToken)
- throws YarnRemoteException {
- RenewDelegationTokenRequest request = Records
- .newRecord(RenewDelegationTokenRequest.class);
- request.setDelegationToken(rmToken);
- RenewDelegationTokenResponse response = rmClient
- .renewDelegationToken(request);
- return response.getNextExpirationTime();
- }
-
- // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
- // are part of ClietnRMProtocol
- @Private
- public void cancelRMDelegationToken(DelegationToken rmToken)
- throws YarnRemoteException {
- CancelDelegationTokenRequest request = Records
- .newRecord(CancelDelegationTokenRequest.class);
- request.setDelegationToken(rmToken);
- rmClient.cancelDelegationToken(request);
- }
-
private GetQueueInfoRequest
getQueueInfoRequest(String queueName, boolean includeApplications,
boolean includeChildQueues, boolean recursive) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java?rev=1443134&r1=1443133&r2=1443134&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java Wed Feb 6 19:06:22 2013
@@ -19,10 +19,28 @@
package org.apache.hadoop.yarn.security.client;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
/**
* Delegation Token Identifier that identifies the delegation tokens from the
@@ -51,4 +69,100 @@ public class RMDelegationTokenIdentifier
public Text getKind() {
return KIND_NAME;
}
+
+ public static class Renewer extends TokenRenewer {
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return KIND_NAME.equals(kind);
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ private static
+ AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> localSecretManager;
+ private static InetSocketAddress localServiceAddress;
+
+ @Private
+ public static void setSecretManager(
+ AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> secretManager,
+ InetSocketAddress serviceAddress) {
+ localSecretManager = secretManager;
+ localServiceAddress = serviceAddress;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renew(Token<?> token, Configuration conf) throws IOException,
+ InterruptedException {
+ final ClientRMProtocol rmClient = getRmClient(token, conf);
+ if (rmClient != null) {
+ try {
+ RenewDelegationTokenRequest request =
+ Records.newRecord(RenewDelegationTokenRequest.class);
+ request.setDelegationToken(convertToProtoToken(token));
+ return rmClient.renewDelegationToken(request).getNextExpirationTime();
+ } finally {
+ RPC.stopProxy(rmClient);
+ }
+ } else {
+ return localSecretManager.renewToken(
+ (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token, Configuration conf) throws IOException,
+ InterruptedException {
+ final ClientRMProtocol rmClient = getRmClient(token, conf);
+ if (rmClient != null) {
+ try {
+ CancelDelegationTokenRequest request =
+ Records.newRecord(CancelDelegationTokenRequest.class);
+ request.setDelegationToken(convertToProtoToken(token));
+ rmClient.cancelDelegationToken(request);
+ } finally {
+ RPC.stopProxy(rmClient);
+ }
+ } else {
+ localSecretManager.cancelToken(
+ (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+ }
+ }
+
+ private static ClientRMProtocol getRmClient(Token<?> token,
+ Configuration conf) {
+ InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
+ if (localSecretManager != null) {
+ // return null if it's our token
+ if (localServiceAddress.getAddress().isAnyLocalAddress()) {
+ if (NetUtils.isLocalAddress(addr.getAddress()) &&
+ addr.getPort() == localServiceAddress.getPort()) {
+ return null;
+ }
+ } else if (addr.equals(localServiceAddress)) {
+ return null;
+ }
+ }
+ final YarnRPC rpc = YarnRPC.create(conf);
+ return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf);
+ }
+
+ // get renewer so we can always renew our own tokens
+ @SuppressWarnings("unchecked")
+ private static String getRenewer(Token<?> token) throws IOException {
+ return ((Token<RMDelegationTokenIdentifier>)token).decodeIdentifier()
+ .getRenewer().toString();
+ }
+
+ private static DelegationToken convertToProtoToken(Token<?> token) {
+ return BuilderUtils.newDelegationToken(
+ token.getIdentifier(), token.getKind().toString(),
+ token.getPassword(), token.getService().toString());
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1443134&r1=1443133&r2=1443134&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Wed Feb 6 19:06:22 2013
@@ -13,3 +13,4 @@
#
org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
+org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1443134&r1=1443133&r2=1443134&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Feb 6 19:06:22 2013
@@ -157,6 +157,10 @@ public class ClientRMService extends Abs
this.server.start();
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
server.getListenerAddress());
+ // enable RM to short-circuit token operations directly to itself
+ RMDelegationTokenIdentifier.Renewer.setSecretManager(
+ rmDTSecretManager, clientBindAddress);
+
super.start();
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1443134&r1=1443133&r2=1443134&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Wed Feb 6 19:06:22 2013
@@ -17,13 +17,12 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
@@ -34,9 +33,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
import org.junit.Test;
@@ -59,6 +66,10 @@ public class TestClientRMTokens {
private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
+ @Before
+ public void resetSecretManager() {
+ RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+ }
@Test
public void testDelegationToken() throws IOException, InterruptedException {
@@ -200,7 +211,122 @@ public class TestClientRMTokens {
RPC.stopProxy(clientRMWithDT);
}
}
+ }
+
+ @Test
+ public void testShortCircuitRenewCancel()
+ throws IOException, InterruptedException {
+ InetSocketAddress addr =
+ new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ checkShortCircuitRenewCancel(addr, addr, true);
+ }
+
+ @Test
+ public void testShortCircuitRenewCancelWildcardAddress()
+ throws IOException, InterruptedException {
+ InetSocketAddress rmAddr = new InetSocketAddress(123);
+ checkShortCircuitRenewCancel(
+ rmAddr,
+ new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+ true);
+ }
+
+ @Test
+ public void testShortCircuitRenewCancelSameHostDifferentPort()
+ throws IOException, InterruptedException {
+ InetSocketAddress rmAddr =
+ new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ checkShortCircuitRenewCancel(
+ rmAddr,
+ new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
+ false);
+ }
+
+ @Test
+ public void testShortCircuitRenewCancelDifferentHostSamePort()
+ throws IOException, InterruptedException {
+ InetSocketAddress rmAddr =
+ new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ checkShortCircuitRenewCancel(
+ rmAddr,
+ new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
+ false);
+ }
+
+ @Test
+ public void testShortCircuitRenewCancelDifferentHostDifferentPort()
+ throws IOException, InterruptedException {
+ InetSocketAddress rmAddr =
+ new InetSocketAddress(InetAddress.getLocalHost(), 123);
+ checkShortCircuitRenewCancel(
+ rmAddr,
+ new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
+ false);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
+ InetSocketAddress serviceAddr,
+ boolean shouldShortCircuit
+ ) throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
+ YarnBadRPC.class, YarnRPC.class);
+ RMDelegationTokenSecretManager secretManager =
+ mock(RMDelegationTokenSecretManager.class);
+ RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
+
+ RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
+ new Text("owner"), new Text("renewer"), null);
+ Token<RMDelegationTokenIdentifier> token =
+ new Token<RMDelegationTokenIdentifier>(ident, secretManager);
+
+ SecurityUtil.setTokenService(token, serviceAddr);
+ if (shouldShortCircuit) {
+ token.renew(conf);
+ verify(secretManager).renewToken(eq(token), eq("renewer"));
+ reset(secretManager);
+ token.cancel(conf);
+ verify(secretManager).cancelToken(eq(token), eq("renewer"));
+ } else {
+ try {
+ token.renew(conf);
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals("getProxy", e.getMessage());
+ }
+ verify(secretManager, never()).renewToken(any(Token.class), anyString());
+ try {
+ token.cancel(conf);
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals("getProxy", e.getMessage());
+ }
+ verify(secretManager, never()).cancelToken(any(Token.class), anyString());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static class YarnBadRPC extends YarnRPC {
+ @Override
+ public Object getProxy(Class protocol, InetSocketAddress addr,
+ Configuration conf) {
+ throw new RuntimeException("getProxy");
+ }
+
+ @Override
+ public void stopProxy(Object proxy, Configuration conf) {
+ throw new RuntimeException("stopProxy");
+ }
+
+ @Override
+ public Server getServer(Class protocol, Object instance,
+ InetSocketAddress addr, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager,
+ int numHandlers, String portRangeConfig) {
+ throw new RuntimeException("getServer");
+ }
}
// Get the delegation token directly as it is a little difficult to setup