You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ss...@apache.org on 2013/02/06 20:07:04 UTC

svn commit: r1443136 - in /hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ had...

Author: sseth
Date: Wed Feb  6 19:07:03 2013
New Revision: 1443136

URL: http://svn.apache.org/viewvc?rev=1443136&view=rev
Log:
YARN-355. Fixes a bug where RM app submission could jam under load. Contributed by Daryn Sharp.

Removed:
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1443136&r1=1443135&r2=1443136&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Wed Feb  6 19:07:03 2013
@@ -32,6 +32,9 @@ Release 0.23.7 - UNRELEASED
 
     YARN-40. Provide support for missing yarn commands (Devaraj K via tgraves)
 
+    YARN-355. Fixes a bug where RM app submission could jam under load.
+    (Daryn Sharp via sseth)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java?rev=1443136&r1=1443135&r2=1443136&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java Wed Feb  6 19:07:03 2013
@@ -19,10 +19,30 @@
 package org.apache.hadoop.yarn.security.client;
 
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Delegation Token Identifier that identifies the delegation tokens from the 
@@ -51,4 +71,102 @@ public class RMDelegationTokenIdentifier
   public Text getKind() {
     return KIND_NAME;
   }
+  
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return KIND_NAME.equals(kind);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    private static
+    AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> localSecretManager;
+    private static InetSocketAddress localServiceAddress;
+    
+    @Private
+    public static void setSecretManager(
+        AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> secretManager,
+        InetSocketAddress serviceAddress) {
+      localSecretManager = secretManager;
+      localServiceAddress = serviceAddress;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          RenewDelegationTokenRequest request =
+              Records.newRecord(RenewDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          return rmClient.renewDelegationToken(request).getNextExpirationTime();
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        return localSecretManager.renewToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          CancelDelegationTokenRequest request =
+              Records.newRecord(CancelDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          rmClient.cancelDelegationToken(request);
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        localSecretManager.cancelToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+    
+    private static ClientRMProtocol getRmClient(Token<?> token,
+        Configuration conf) {
+      InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
+      if (localSecretManager != null) {
+        // return null if it's our token
+        if (localServiceAddress.getAddress().isAnyLocalAddress()) {
+            if (NetUtils.isLocalAddress(addr.getAddress()) &&
+                addr.getPort() == localServiceAddress.getPort()) {
+              return null;
+            }
+        } else if (addr.equals(localServiceAddress)) {
+          return null;
+        }
+      }
+      final YarnRPC rpc = YarnRPC.create(conf);
+      return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf);        
+    }
+    
+    // get renewer so we can always renew our own tokens
+    private static String getRenewer(Token<?> token) throws IOException {
+      RMDelegationTokenIdentifier id = new RMDelegationTokenIdentifier();
+      DataInputStream in = new DataInputStream(
+          new ByteArrayInputStream(token.getIdentifier()));
+      id.readFields(in);
+      return id.getRenewer().toString();
+    }
+
+    private static DelegationToken convertToProtoToken(Token<?> token) {
+      return BuilderUtils.newDelegationToken(
+          token.getIdentifier(), token.getKind().toString(),
+          token.getPassword(), token.getService().toString());
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1443136&r1=1443135&r2=1443136&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Wed Feb  6 19:07:03 2013
@@ -13,4 +13,4 @@
 
 org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
 org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
-org.apache.hadoop.yarn.security.RMDelegationTokenRenewer
+org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1443136&r1=1443135&r2=1443136&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Feb  6 19:07:03 2013
@@ -157,6 +157,10 @@ public class ClientRMService extends Abs
     this.server.start();
     clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
                                                server.getListenerAddress());
+    // enable RM to short-circuit token operations directly to itself
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(
+        rmDTSecretManager, clientBindAddress);
+    
     super.start();
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1443136&r1=1443135&r2=1443136&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Wed Feb  6 19:07:03 2013
@@ -17,13 +17,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
@@ -34,9 +33,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
 import org.junit.Test;
 
 
@@ -59,6 +66,10 @@ public class TestClientRMTokens {
 
   private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
   
+  @Before
+  public void resetSecretManager() {
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+  }
   
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
@@ -202,7 +213,122 @@ public class TestClientRMTokens {
         RPC.stopProxy(clientRMWithDT);
       }
     }
+  }
+  
+  @Test
+  public void testShortCircuitRenewCancel()
+      throws IOException, InterruptedException {
+    InetSocketAddress addr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(addr, addr, true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelWildcardAddress()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr = new InetSocketAddress(123);
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+        true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelSameHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostSamePort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
+        false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
+                                            InetSocketAddress serviceAddr,
+                                            boolean shouldShortCircuit
+      ) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
+        YarnBadRPC.class, YarnRPC.class);
     
+    RMDelegationTokenSecretManager secretManager =
+        mock(RMDelegationTokenSecretManager.class);
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
+
+    RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
+        new Text("owner"), new Text("renewer"), null);
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>(ident, secretManager);
+
+    SecurityUtil.setTokenService(token, serviceAddr);
+    if (shouldShortCircuit) {
+      token.renew(conf);
+      verify(secretManager).renewToken(eq(token), eq("renewer"));
+      reset(secretManager);
+      token.cancel(conf);
+      verify(secretManager).cancelToken(eq(token), eq("renewer"));
+    } else {      
+      try { 
+        token.renew(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).renewToken(any(Token.class), anyString());
+      try { 
+        token.cancel(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).cancelToken(any(Token.class), anyString());
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static class YarnBadRPC extends YarnRPC {
+    @Override
+    public Object getProxy(Class protocol, InetSocketAddress addr,
+        Configuration conf) {
+      throw new RuntimeException("getProxy");
+    }
+
+    @Override
+    public void stopProxy(Object proxy, Configuration conf) {
+      throw new RuntimeException("stopProxy");
+    }
+
+    @Override
+    public Server getServer(Class protocol, Object instance,
+        InetSocketAddress addr, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager,
+        int numHandlers, String portRangeConfig) {
+      throw new RuntimeException("getServer");
+    }
   }
   
   // Get the delegation token directly as it is a little difficult to setup