You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ji...@apache.org on 2011/10/14 03:24:22 UTC

svn commit: r1183187 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...

Author: jitendra
Date: Fri Oct 14 01:24:20 2011
New Revision: 1183187

URL: http://svn.apache.org/viewvc?rev=1183187&view=rev
Log:
MAPREDUCE-2764. Fix renewal of dfs delegation tokens. Contributed by Owen.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/services/
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct 14 01:24:20 2011
@@ -48,6 +48,8 @@ Trunk (unreleased changes)
     MAPREDUCE-3183. hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml 
     missing license header. (Hitesh Shah via tucu).
 
+    MAPREDUCE-2764. Fix renewal of dfs delegation tokens. (Owen via jitendra)
+
 Release 0.23.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Fri Oct 14 01:24:20 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -459,6 +460,37 @@ public class JobClient extends CLI {
     cluster = new Cluster(conf);
   }
 
+  @InterfaceAudience.Private
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf
+                      ) throws IOException, InterruptedException {
+      return new Cluster(conf).
+        renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf
+                       ) throws IOException, InterruptedException {
+      new Cluster(conf).
+        cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+    
+  }
+
   /**
    * Build a job client, connect to the indicated job tracker.
    * 
@@ -1048,22 +1080,24 @@ public class JobClient extends CLI {
    * @return true if the renewal went well
    * @throws InvalidToken
    * @throws IOException
+   * @deprecated Use {@link Token.renew} instead
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                    ) throws InvalidToken, IOException, 
                                             InterruptedException {
-    return cluster.renewDelegationToken(token);
+    return token.renew(getConf());
   }
 
   /**
    * Cancel a delegation token from the JobTracker
    * @param token the token to cancel
    * @throws IOException
+   * @deprecated Use {@link Token.cancel} instead
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                     ) throws InvalidToken, IOException, 
                                              InterruptedException {
-    cluster.cancelDelegationToken(token);
+    token.cancel(getConf());
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java Fri Oct 14 01:24:20 2011
@@ -371,6 +371,7 @@ public class Cluster {
    * @return the new expiration time
    * @throws InvalidToken
    * @throws IOException
+   * @deprecated Use {@link Token.renew} instead
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                    ) throws InvalidToken, IOException,
@@ -387,6 +388,7 @@ public class Cluster {
    * Cancel a delegation token from the JobTracker
    * @param token the token to cancel
    * @throws IOException
+   * @deprecated Use {@link Token.cancel} instead
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                     ) throws IOException,

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Oct 14 01:24:20 2011
@@ -19,8 +19,6 @@
 package org.apache.hadoop.mapreduce.security.token;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,18 +35,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.util.StringUtils;
 
@@ -64,14 +54,14 @@ public class DelegationTokenRenewal {
    *
    */
   private static class DelegationTokenToRenew {
-    public final Token<DelegationTokenIdentifier> token;
+    public final Token<?> token;
     public final JobID jobId;
     public final Configuration conf;
     public long expirationDate;
     public TimerTask timerTask;
     
     public DelegationTokenToRenew(
-        JobID jId, Token<DelegationTokenIdentifier> t, 
+        JobID jId, Token<?> t, 
         Configuration newConf, long newExpirationDate) {
       token = t;
       jobId = jId;
@@ -124,10 +114,9 @@ public class DelegationTokenRenewal {
   
   private static class DelegationTokenCancelThread extends Thread {
     private static class TokenWithConf {
-      Token<DelegationTokenIdentifier> token;
+      Token<?> token;
       Configuration conf;
-      TokenWithConf(Token<DelegationTokenIdentifier> token,  
-          Configuration conf) {
+      TokenWithConf(Token<?> token, Configuration conf) {
         this.token = token;
         this.conf = conf;
       }
@@ -139,7 +128,7 @@ public class DelegationTokenRenewal {
       super("Delegation Token Canceler");
       setDaemon(true);
     }
-    public void cancelToken(Token<DelegationTokenIdentifier> token,  
+    public void cancelToken(Token<?> token,  
         Configuration conf) {
       TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
       while (!queue.offer(tokenWithConf)) {
@@ -158,25 +147,21 @@ public class DelegationTokenRenewal {
         TokenWithConf tokenWithConf = null;
         try {
           tokenWithConf = queue.take();
-          DistributedFileSystem dfs = null;
-          try {
-            // do it over rpc. For that we need DFS object
-            dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf);
-          } catch (Exception e) {
-            LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
-            dfs = null;
-          }
-      
-          if(dfs != null) {
-            dfs.cancelDelegationToken(tokenWithConf.token);
-          } else {
-            cancelDelegationTokenOverHttps(tokenWithConf.token, 
-                                           tokenWithConf.conf);
-          }
+          final TokenWithConf current = tokenWithConf;
+          
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Canceling token " + tokenWithConf.token.getService() +  
-                " for dfs=" + dfs);
+            LOG.debug("Canceling token " + tokenWithConf.token.getService());
           }
+          // need to use doAs so that http can find the kerberos tgt
+          UserGroupInformation.getLoginUser().doAs(
+              new PrivilegedExceptionAction<Void>() {
+
+                @Override
+                public Void run() throws Exception {
+                  current.token.cancel(current.conf);
+                  return null;
+                }
+              });
         } catch (IOException e) {
           LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
               StringUtils.stringifyException(e));
@@ -195,119 +180,29 @@ public class DelegationTokenRenewal {
     delegationTokens.add(t);
   }
   
-  // kind of tokens we currently renew
-  private static final Text kindHdfs = 
-    DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
-  
-  @SuppressWarnings("unchecked")
   public static synchronized void registerDelegationTokensForRenewal(
-      JobID jobId, Credentials ts, Configuration conf) {
+      JobID jobId, Credentials ts, Configuration conf) throws IOException {
     if(ts==null)
       return; //nothing to add
     
-    Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
+    Collection <Token<?>> tokens = ts.getAllTokens();
     long now = System.currentTimeMillis();
-    
-    for(Token<? extends TokenIdentifier> t : tokens) {
-      // currently we only check for HDFS delegation tokens
-      // later we can add more different types.
-      if(! t.getKind().equals(kindHdfs)) {
-        continue; 
-      }
-      Token<DelegationTokenIdentifier> dt = 
-        (Token<DelegationTokenIdentifier>)t;
-      
-      // first renew happens immediately
-      DelegationTokenToRenew dtr = 
-        new DelegationTokenToRenew(jobId, dt, conf, now); 
-
-      addTokenToList(dtr);
-      
-      setTimerForTokenRenewal(dtr, true);
-      LOG.info("registering token for renewal for service =" + dt.getService()+
-          " and jobID = " + jobId);
-    }
-  }
-  
-  private static String getHttpAddressForToken(
-      Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws IOException {
-
-    String[] ipaddr = token.getService().toString().split(":");
 
-    InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
-    String dnsName = iaddr.getCanonicalHostName();
-    
-    // in case it is a different cluster it may have a different port
-    String httpsPort = conf.get("dfs.hftp.https.port");
-    if(httpsPort == null) {
-      // get from this cluster
-      httpsPort = conf.get(DFSConfigKeys.DFS_HTTPS_PORT_KEY, 
-          "" + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
-    }
-
-    // always use https (it is for security only)
-    return "https://" + dnsName+":"+httpsPort;
-  }
-
-  protected static long renewDelegationTokenOverHttps(
-      final Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws InterruptedException, IOException{
-    final String httpAddress = getHttpAddressForToken(token, conf);
-    // will be chaged to debug
-    LOG.info("address to renew=" + httpAddress + "; tok=" + token.getService());
-    Long expDate = (Long) UserGroupInformation.getLoginUser().doAs(
-        new PrivilegedExceptionAction<Long>() {
-          public Long run() throws IOException {
-            return DelegationTokenFetcher.renewDelegationToken(httpAddress, token);  
-          }
-        });
-    LOG.info("Renew over HTTP done. addr="+httpAddress+";res="+expDate);
-    return expDate;
-  }
-  
-  private static long renewDelegationToken(DelegationTokenToRenew dttr) 
-  throws Exception {
-    long newExpirationDate=System.currentTimeMillis()+3600*1000;
-    Token<DelegationTokenIdentifier> token = dttr.token;
-    Configuration conf = dttr.conf;
-    if(token.getKind().equals(kindHdfs)) {
-      DistributedFileSystem dfs=null;
-    
-      try {
-        // do it over rpc. For that we need DFS object
-        dfs = getDFSForToken(token, conf);
-      } catch (IOException e) {
-        LOG.info("couldn't get DFS to renew. Will retry over HTTPS");
-        dfs = null;
-      }
-      
-      try {
-        if(dfs != null)
-          newExpirationDate = dfs.renewDelegationToken(token);
-        else {
-          // try HTTP
-          newExpirationDate = renewDelegationTokenOverHttps(token, conf);
-        }
-      } catch (InvalidToken ite) {
-        LOG.warn("invalid token - not scheduling for renew");
-        removeFailedDelegationToken(dttr);
-        throw new IOException("failed to renew token", ite);
-      } catch (AccessControlException ioe) {
-        LOG.warn("failed to renew token:"+token, ioe);
-        removeFailedDelegationToken(dttr);
-        throw new IOException("failed to renew token", ioe);
-      } catch (Exception e) {
-        LOG.warn("failed to renew token:"+token, e);
-        // returns default expiration date
+    for (Token<?> t : tokens) {
+      // first renew happens immediately
+      if (t.isManaged()) {
+        DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
+            now);
+
+        addTokenToList(dtr);
+
+        setTimerForTokenRenewal(dtr, true);
+        LOG.info("registering token for renewal for service =" + t.getService()
+            + " and jobID = " + jobId);
       }
-    } else {
-      throw new Exception("unknown token type to renew:"+token.getKind());
     }
-    return newExpirationDate;
   }
-
-  
+    
   /**
    * Task - to renew a token
    *
@@ -319,43 +214,31 @@ public class DelegationTokenRenewal {
     
     @Override
     public void run() {
-      Token<DelegationTokenIdentifier> token = dttr.token;
+      Token<?> token = dttr.token;
       long newExpirationDate=0;
       try {
-        newExpirationDate = renewDelegationToken(dttr);
+        // need to use doAs so that http can find the kerberos tgt
+        dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
+            new PrivilegedExceptionAction<Long>() {
+
+              @Override
+              public Long run() throws Exception {
+                return dttr.token.renew(dttr.conf);
+              }
+            });
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("renewing for:" + token.getService() + ";newED="
+              + dttr.expirationDate);
+        }
+        setTimerForTokenRenewal(dttr, false);// set the next one
       } catch (Exception e) {
-        return; // message logged in renewDT method
+        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
+        removeFailedDelegationToken(dttr);
       }
-      if (LOG.isDebugEnabled())
-        LOG.debug("renewing for:"+token.getService()+";newED=" + 
-            newExpirationDate);
-      
-      // new expiration date
-      dttr.expirationDate = newExpirationDate;
-      setTimerForTokenRenewal(dttr, false);// set the next one
     }
   }
   
-  private static DistributedFileSystem getDFSForToken(
-      Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws Exception {
-    DistributedFileSystem dfs = null;
-    try {
-      final URI uri = new URI (SCHEME + "://" + token.getService().toString());
-      dfs = 
-      UserGroupInformation.getLoginUser().doAs(
-          new PrivilegedExceptionAction<DistributedFileSystem>() {
-        public DistributedFileSystem run() throws IOException {
-          return (DistributedFileSystem) FileSystem.get(uri, conf);  
-        }
-      });
-    } catch (Exception e) {
-      LOG.warn("Failed to create a dfs to renew/cancel for:" + token.getService(), e);
-      throw e;
-    } 
-    return dfs;
-  }
-  
   /**
    * find the soonest expiring token and set it for renew
    */
@@ -372,15 +255,11 @@ public class DelegationTokenRenewal {
       renewIn = now + expiresIn - expiresIn/10; // little before expiration
     }
     
-    try {
-      // need to create new timer every time
-      TimerTask tTask = new RenewalTimerTask(token);
-      token.setTimerTask(tTask); // keep reference to the timer
-
-      renewalTimer.schedule(token.timerTask, new Date(renewIn));
-    } catch (Exception e) {
-      LOG.warn("failed to schedule a task, token will not renew more", e);
-    }
+    // need to create new timer every time
+    TimerTask tTask = new RenewalTimerTask(token);
+    token.setTimerTask(tTask); // keep reference to the timer
+
+    renewalTimer.schedule(token.timerTask, new Date(renewIn));
   }
 
   /**
@@ -391,33 +270,9 @@ public class DelegationTokenRenewal {
     delegationTokens.clear();
   }
   
-  
-  protected static void cancelDelegationTokenOverHttps(
-      final Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws InterruptedException, IOException{
-    final String httpAddress = getHttpAddressForToken(token, conf);
-    // will be chaged to debug
-    LOG.info("address to cancel=" + httpAddress + "; tok=" + token.getService());
-    
-    UserGroupInformation.getLoginUser().doAs(
-        new PrivilegedExceptionAction<Void>() {
-          public Void run() throws IOException {
-            DelegationTokenFetcher.cancelDelegationToken(httpAddress, token);
-            return null;
-          }
-        });
-    LOG.info("Cancel over HTTP done. addr="+httpAddress);
-  }
-  
-  
   // cancel a token
   private static void cancelToken(DelegationTokenToRenew t) {
-    Token<DelegationTokenIdentifier> token = t.token;
-    Configuration conf = t.conf;
-    
-    if(token.getKind().equals(kindHdfs)) {
-      dtCancelThread.cancelToken(token, conf);
-    }
+    dtCancelThread.cancelToken(t.token, t.conf);
   }
   
   /**

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Fri Oct 14 01:24:20 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -35,7 +36,7 @@ import org.apache.hadoop.security.UserGr
 @InterfaceStability.Unstable
 public class JobTokenIdentifier extends TokenIdentifier {
   private Text jobid;
-  final static Text KIND_NAME = new Text("mapreduce.job");
+  public final static Text KIND_NAME = new Text("mapreduce.job");
   
   /**
    * Default constructor
@@ -86,4 +87,12 @@ public class JobTokenIdentifier extends 
   public void write(DataOutput out) throws IOException {
     jobid.write(out);
   }
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java Fri Oct 14 01:24:20 2011
@@ -30,7 +30,7 @@ import org.apache.hadoop.security.token.
 @InterfaceStability.Unstable
 public class DelegationTokenIdentifier 
     extends AbstractDelegationTokenIdentifier {
-  static final Text MAPREDUCE_DELEGATION_KIND = 
+  public static final Text MAPREDUCE_DELEGATION_KIND = 
     new Text("MAPREDUCE_DELEGATION_TOKEN");
 
   /**

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1183187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Fri Oct 14 01:24:20 2011
@@ -0,0 +1,2 @@
+org.apache.hadoop.mapred.JobClient$Renewer
+org.apache.hadoop.mapreduce.security.token.JobTokenIndentifier$Renewer
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java Fri Oct 14 01:24:20 2011
@@ -22,8 +22,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
@@ -74,4 +76,11 @@ public class ApplicationTokenIdentifier 
     return UserGroupInformation.createRemoteUser(appId.toString());
   }
 
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java Fri Oct 14 01:24:20 2011
@@ -24,8 +24,10 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -115,4 +117,12 @@ public class ContainerTokenIdentifier ex
     return UserGroupInformation.createRemoteUser(this.containerId.toString());
   }
 
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND;
+    }
+  }
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1183187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Fri Oct 14 01:24:20 2011
@@ -0,0 +1,2 @@
+org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
+org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1183187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Fri Oct 14 01:24:20 2011
@@ -0,0 +1 @@
+org.apache.hadoop.mapreduce.security.token.TestDelegationTokenRenewal$Renewer

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1183187&r1=1183186&r2=1183187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Oct 14 01:24:20 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.secu
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.URI;
@@ -41,6 +42,7 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -54,6 +56,53 @@ public class TestDelegationTokenRenewal 
   private static final Log LOG = 
       LogFactory.getLog(TestDelegationTokenRenewal.class);
 
+  private static final Text KIND = 
+    new Text("TestDelegationTokenRenewal.Token");
+
+  public static class Renewer extends TokenRenewer {
+    private static int counter = 0;
+    private static Token<?> lastRenewed = null;
+    private static Token<?> tokenToRenewIn2Sec = null;
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return KIND.equals(kind);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    @Override
+    public long renew(Token<?> t, Configuration conf) throws IOException {
+      MyToken token = (MyToken)t;
+      if(token.isCanceled()) {
+        throw new InvalidToken("token has been canceled");
+      }
+      lastRenewed = token;
+      counter ++;
+      LOG.info("Called MYDFS.renewdelegationtoken " + token + 
+               ";this dfs=" + this.hashCode() + ";c=" + counter);
+      if(tokenToRenewIn2Sec == token) { 
+        // this token first renewal in 2 seconds
+        LOG.info("RENEW in 2 seconds");
+        tokenToRenewIn2Sec=null;
+        return 2*1000 + System.currentTimeMillis();
+      } else {
+        return 86400*1000 + System.currentTimeMillis();
+      }
+    }
+
+    @Override
+    public void cancel(Token<?> t, Configuration conf) {
+      MyToken token = (MyToken)t;
+      LOG.info("Cancel token " + token);
+      token.cancelToken();
+   }
+
+  }
+
   private static Configuration conf;
  
   @BeforeClass
@@ -66,7 +115,7 @@ public class TestDelegationTokenRenewal 
     System.out.println("scheme is : " + uri.getScheme());
     conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class, DistributedFileSystem.class);
     FileSystem.setDefaultUri(conf, uri);
-    System.out.println("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
+    LOG.info("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
   }
   
   private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
@@ -97,11 +146,14 @@ public class TestDelegationTokenRenewal 
     public MyToken(DelegationTokenIdentifier dtId1,
         MyDelegationTokenSecretManager sm) {
       super(dtId1, sm);
+      setKind(KIND);
       status = "GOOD";
     }
     
     public boolean isCanceled() {return status.equals(CANCELED);}
+
     public void cancelToken() {this.status=CANCELED;}
+
     public String toString() {
       StringBuilder sb = new StringBuilder(1024);
       
@@ -127,50 +179,19 @@ public class TestDelegationTokenRenewal 
    * exception
    */
   static class MyFS extends DistributedFileSystem {
-    int counter=0;
-    MyToken token;
-    MyToken tokenToRenewIn2Sec;
     
     public MyFS() {}
     public void close() {}
     @Override
     public void initialize(URI uri, Configuration conf) throws IOException {}
     
-    @Override
-    public long renewDelegationToken(Token<DelegationTokenIdentifier> t)
-    throws InvalidToken, IOException {
-      MyToken token = (MyToken)t;
-      if(token.isCanceled()) {
-        throw new InvalidToken("token has been canceled");
-      }
-      counter ++;
-      this.token = (MyToken)token;
-      System.out.println("Called MYDFS.renewdelegationtoken " + token);
-      if(tokenToRenewIn2Sec == token) { 
-        // this token first renewal in 2 seconds
-        System.out.println("RENEW in 2 seconds");
-        tokenToRenewIn2Sec=null;
-        return 2*1000 + System.currentTimeMillis();
-      } else {
-        return 86400*1000 + System.currentTimeMillis();
-      }
-    }
     @Override 
-    public MyToken getDelegationToken(Text renewer)
-    throws IOException {
-      System.out.println("Called MYDFS.getdelegationtoken");
-      return createTokens(renewer);
-    }
-    @Override
-    public void cancelDelegationToken(Token<DelegationTokenIdentifier> t)
-    throws IOException {
-      MyToken token = (MyToken)t;
-      token.cancelToken();
+    public MyToken getDelegationToken(Text renewer) throws IOException {
+      MyToken result = createTokens(renewer);
+      LOG.info("Called MYDFS.getdelegationtoken " + result);
+      return result;
     }
 
-    public void setTokenToRenewIn2Sec(MyToken t) {tokenToRenewIn2Sec=t;}
-    public int getCounter() {return counter; }
-    public MyToken getToken() {return token;}
   }
   
   /**
@@ -218,9 +239,9 @@ public class TestDelegationTokenRenewal 
    * @throws URISyntaxException
    */
   @Test
-  public void testDTRenewal () throws IOException, URISyntaxException {
+  public void testDTRenewal () throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
-    System.out.println("dfs="+(Object)dfs);
+    LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
     // Test 1. - add three tokens - make sure exactly one get's renewed
     
     // get the delegation tokens
@@ -230,8 +251,8 @@ public class TestDelegationTokenRenewal 
     token3 = dfs.getDelegationToken(new Text("user3"));
 
     //to cause this one to be set for renew in 2 secs
-    dfs.setTokenToRenewIn2Sec(token1); 
-    System.out.println("token="+token1+" should be renewed for 2 secs");
+    Renewer.tokenToRenewIn2Sec = token1;
+    LOG.info("token="+token1+" should be renewed for 2 secs");
     
     // two distinct Namenodes
     String nn1 = DelegationTokenRenewal.SCHEME + "://host1:0";
@@ -258,15 +279,13 @@ public class TestDelegationTokenRenewal 
       } catch (InterruptedException e) {}
       
       // since we cannot guarantee timely execution - let's give few chances
-      if(dfs.getCounter()==numberOfExpectedRenewals)
+      if(Renewer.counter==numberOfExpectedRenewals)
         break;
     }
     
-    System.out.println("Counter = " + dfs.getCounter() + ";t="+
-        dfs.getToken());
     assertEquals("renew wasn't called as many times as expected(4):",
-        numberOfExpectedRenewals, dfs.getCounter());
-    assertEquals("most recently renewed token mismatch", dfs.getToken(), 
+        numberOfExpectedRenewals, Renewer.counter);
+    assertEquals("most recently renewed token mismatch", Renewer.lastRenewed, 
         token1);
     
     // Test 2. 
@@ -277,8 +296,8 @@ public class TestDelegationTokenRenewal 
     MyToken token4 = dfs.getDelegationToken(new Text("user4"));
     
     //to cause this one to be set for renew in 2 secs
-    dfs.setTokenToRenewIn2Sec(token4); 
-    System.out.println("token="+token4+" should be renewed for 2 secs");
+    Renewer.tokenToRenewIn2Sec = token4; 
+    LOG.info("token="+token4+" should be renewed for 2 secs");
     
     String nn4 = DelegationTokenRenewal.SCHEME + "://host4:0";
     ts.addToken(new Text(nn4), token4);
@@ -287,24 +306,23 @@ public class TestDelegationTokenRenewal 
     JobID jid2 = new JobID("job2",1);
     DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
     DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
-    numberOfExpectedRenewals = dfs.getCounter(); // number of renewals so far
+    numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
       Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
     } catch (InterruptedException e) {}
-    System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
+    System.out.println("Counter = " + Renewer.counter + ";t="+ 
+                       Renewer.lastRenewed);
     
     // counter and the token should stil be the old ones
     assertEquals("renew wasn't called as many times as expected",
-        numberOfExpectedRenewals, dfs.getCounter());
+        numberOfExpectedRenewals, Renewer.counter);
     
     // also renewing of the cancelled token should fail
-    boolean exception=false;
     try {
-      dfs.renewDelegationToken(token4);
+      token4.renew(conf);
+      fail("Renew of canceled token didn't fail");
     } catch (InvalidToken ite) {
       //expected
-      exception = true;
     }
-    assertTrue("Renew of canceled token didn't fail", exception);
   }
 }