You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/09/10 00:07:36 UTC

svn commit: r1167374 - in /hadoop/common/branches/branch-0.20-security: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/security/ src/core/org/apache/hadoop/security/token/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/s...

Author: omalley
Date: Fri Sep  9 22:07:35 2011
New Revision: 1167374

URL: http://svn.apache.org/viewvc?rev=1167374&view=rev
Log:
MAPREDUCE-2764. Allow JobTracker to renew and cancel arbitrary token types,
including delegation tokens obtained via hftp. (omalley)

Added:
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/build.xml
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep  9 22:07:35 2011
@@ -25,6 +25,9 @@ Release 0.20.205.0 - unreleased
     HDFS-1520. Lightweight NameNode operation recoverLease to trigger 
     lease recovery. (Hairong Kuang via dhruba)
 
+    MAPREDUCE-2764. Allow JobTracker to renew and cancel arbitrary token types,
+    including delegation tokens obtained via hftp. (omalley)
+
   BUG FIXES
 
     MAPREDUCE-2324. Removed usage of broken

Modified: hadoop/common/branches/branch-0.20-security/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/build.xml?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security/build.xml Fri Sep  9 22:07:35 2011
@@ -704,6 +704,13 @@
           <attribute name="Implementation-Vendor" value="Apache"/>
         </section>
       </manifest>
+      <service type="org.apache.hadoop.security.token.TokenRenewer">
+        <provider classname="org.apache.hadoop.hdfs.DFSClient$Renewer"/>
+        <provider classname="org.apache.hadoop.mapred.JobClient$Renewer"/>
+        <provider classname="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer"/>
+        <provider classname="org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier$Renewer"/>
+        <provider classname="org.apache.hadoop.hdfs.HftpFileSystem$TokenManager"/>
+      </service>
       <fileset dir="${conf.dir}" includes="${jar.properties.list}" />
       <fileset file="${jar.extra.properties.list}" />
       <zipfileset dir="${build.webapps}" prefix="webapps"/>

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java Fri Sep  9 22:07:35 2011
@@ -89,6 +89,19 @@ public abstract class FileSystem extends
    */
   private Set<Path> deleteOnExit = new TreeSet<Path>();
 
+  /**
+   * This method adds a file system for testing so that we can find it later.
+   * It is only for testing.
+   * @param uri the uri to store it under
+   * @param conf the configuration to store it under
+   * @param fs the file system to store
+   * @throws IOException
+   */
+  public static void addFileSystemForTesting(URI uri, Configuration conf, 
+                                      FileSystem fs) throws IOException {
+    CACHE.map.put(new Cache.Key(uri, conf), fs);
+  }
+
   public static FileSystem get(final URI uri, final Configuration conf, 
       final String user)
   throws IOException, InterruptedException {

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java Fri Sep  9 22:07:35 2011
@@ -270,7 +270,8 @@ public class SecurityUtil {
    * @return "ip:port"
    */
   static String buildDTAuthority(InetSocketAddress addr) {
-    return buildDTAuthority(addr.getAddress().getHostAddress(), addr.getPort());
+    String host= addr.getAddress().getHostAddress();
+    return buildDTAuthority(host, addr.getPort());
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java Fri Sep  9 22:07:35 2011
@@ -22,9 +22,14 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.ServiceLoader;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -36,10 +41,12 @@ import org.apache.hadoop.io.WritableUtil
  * The client-side form of the token.
  */
 public class Token<T extends TokenIdentifier> implements Writable {
+  public static final Log LOG = LogFactory.getLog(Token.class);
   private byte[] identifier;
   private byte[] password;
   private Text kind;
   private Text service;
+  private TokenRenewer renewer;
   
   /**
    * Construct a token given a token identifier and a secret manager for the
@@ -79,6 +86,17 @@ public class Token<T extends TokenIdenti
   }
 
   /**
+   * Clone a token.
+   * @param other the token to clone
+   */
+  public Token(Token<T> other) {
+    this.identifier = other.identifier;
+    this.password = other.password;
+    this.kind = other.kind;
+    this.service = other.service;
+  }
+
+  /**
    * Get the token identifier
    * @return the token identifier
    */
@@ -103,6 +121,16 @@ public class Token<T extends TokenIdenti
   }
 
   /**
+   * Set the token kind. This is only intended to be used by services that
+   * wrap another service's token, such as HFTP wrapping HDFS.
+   * @param newKind
+   */
+  @InterfaceAudience.Private
+  public void setKind(Text newKind) {
+    kind = newKind;
+  }
+
+  /**
    * Get the service on which the token is supposed to be used
    * @return the service name
    */
@@ -242,4 +270,88 @@ public class Token<T extends TokenIdenti
     buffer.append(service.toString());
     return buffer.toString();
   }
+  
+  private static ServiceLoader<TokenRenewer> renewers =
+      ServiceLoader.load(TokenRenewer.class);
+
+  private synchronized TokenRenewer getRenewer() throws IOException {
+    if (renewer != null) {
+      return renewer;
+    }
+    renewer = TRIVIAL_RENEWER;
+    for (TokenRenewer canidate: renewers) {
+      if (canidate.handleKind(this.kind)) {
+        renewer = canidate;
+        return renewer;
+      }
+    }
+    LOG.warn("No TokenRenewer defined for token kind " + this.kind);
+    return renewer;
+  }
+
+  /**
+   * Is this token managed so that it can be renewed or cancelled?
+   * @return true, if it can be renewed and cancelled.
+   */
+  public boolean isManaged() throws IOException {
+    return getRenewer().isManaged(this);
+  }
+
+  /**
+   * Renew this delegation token
+   * @return the new expiration time
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public long renew(Configuration conf
+                    ) throws IOException, InterruptedException {
+    return getRenewer().renew(this, conf);
+  }
+  
+  /**
+   * Cancel this delegation token
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void cancel(Configuration conf
+                     ) throws IOException, InterruptedException {
+    getRenewer().cancel(this, conf);
+  }
+  
+  /**
+   * A trivial renewer for token kinds that aren't managed. Sub-classes need
+   * to implement getKind for their token kind.
+   */
+  public static class TrivialRenewer extends TokenRenewer {
+    
+    // define the kind for this renewer
+    protected Text getKind() {
+      return null;
+    }
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return kind.equals(getKind());
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) {
+      return false;
+    }
+
+    @Override
+    public long renew(Token<?> token, Configuration conf) {
+      throw new UnsupportedOperationException("Token renewal is not supported "+
+                                              " for " + token.kind + " tokens");
+    }
+
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      throw new UnsupportedOperationException("Token cancel is not supported " +
+          " for " + token.kind + " tokens");
+    }
+
+  }
+  private static final TokenRenewer TRIVIAL_RENEWER = new TrivialRenewer();
 }

Added: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java?rev=1167374&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java Fri Sep  9 22:07:35 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This is the interface for plugins that handle tokens.
+ */
+public abstract class TokenRenewer {
+
+  /**
+   * Does this renewer handle this kind of token?
+   * @param kind the kind of the token
+   * @return true if this renewer can renew it
+   */
+  public abstract boolean handleKind(Text kind);
+
+  /**
+   * Is the given token managed? Only managed tokens may be renewed or
+   * cancelled.
+   * @param token the token being checked
+   * @return true if the token may be renewed or cancelled
+   * @throws IOException
+   */
+  public abstract boolean isManaged(Token<?> token) throws IOException;
+  
+  /**
+   * Renew the given token.
+   * @return the new expiration time
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  public abstract long renew(Token<?> token,
+                             Configuration conf
+                             ) throws IOException, InterruptedException;
+  
+  /**
+   * Cancel the given token
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  public abstract void cancel(Token<?> token,
+                              Configuration conf
+                              ) throws IOException, InterruptedException;
+}

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep  9 22:07:35 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.hdfs.protocol.*;
@@ -42,6 +43,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.*;
 
 import org.apache.commons.logging.*;
@@ -298,17 +300,33 @@ public class DFSClient implements FSCons
     return result;
   }
 
+  /**
+   * Renew a delegation token
+   * @param token the token to renew
+   * @return the new expiration time
+   * @throws InvalidToken
+   * @throws IOException
+   * @deprecated Use Token.renew instead.
+   */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     try {
-      LOG.info("Renewing " + stringifyToken(token));
-      return namenode.renewDelegationToken(token);
+      return token.renew(conf);
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("caught interrupted", ie);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(InvalidToken.class,
                                      AccessControlException.class);
     }
   }
 
+  /**
+   * Cancel a delegation token
+   * @param token the token to cancel
+   * @throws InvalidToken
+   * @throws IOException
+   * @deprecated Use Token.cancel instead.
+   */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     try {
@@ -320,6 +338,55 @@ public class DFSClient implements FSCons
     }
   }
   
+  @InterfaceAudience.Private
+  public static class Renewer extends TokenRenewer {
+    
+    @Override
+    public boolean handleKind(Text kind) {
+      return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf) throws IOException {
+      Token<DelegationTokenIdentifier> delToken = 
+          (Token<DelegationTokenIdentifier>) token;
+      LOG.info("Renewing " + stringifyToken(delToken));
+      ClientProtocol nn = 
+        createRPCNamenode(NameNode.getAddress(token.getService().toString()),
+                          conf, UserGroupInformation.getCurrentUser());
+      try {
+        return nn.renewDelegationToken(delToken);
+      } catch (RemoteException re) {
+        throw re.unwrapRemoteException(InvalidToken.class, 
+                                       AccessControlException.class);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException {
+      Token<DelegationTokenIdentifier> delToken = 
+          (Token<DelegationTokenIdentifier>) token;
+      LOG.info("Cancelling " + stringifyToken(delToken));
+      ClientProtocol nn = 
+          createRPCNamenode(NameNode.getAddress(token.getService().toString()),
+                            conf, UserGroupInformation.getCurrentUser());
+        try {
+          nn.cancelDelegationToken(delToken);
+        } catch (RemoteException re) {
+          throw re.unwrapRemoteException(InvalidToken.class, 
+                                         AccessControlException.class);
+        }
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+    
+  }
+
   /**
    * Report corrupt blocks that were discovered by the client.
    */

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Sep  9 22:07:35 2011
@@ -607,10 +607,15 @@ public class DistributedFileSystem exten
    * @param token delegation token obtained earlier
    * @return the new expiration time
    * @throws IOException
+   * @deprecated Use Token.renew instead.
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
-    return dfs.renewDelegationToken(token);
+    try {
+      return token.renew(getConf());
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Caught interrupted", ie);
+    }
   }
 
   /**
@@ -618,10 +623,15 @@ public class DistributedFileSystem exten
    * 
    * @param token delegation token
    * @throws IOException
+   * @deprecated Use Token.cancel instead.
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    dfs.cancelDelegationToken(token);
+    try {
+      token.cancel(getConf());
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Caught interrupted", ie);
+    }
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Sep  9 22:07:35 2011
@@ -36,6 +36,7 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -60,6 +61,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.Progressable;
 import org.xml.sax.Attributes;
 import org.xml.sax.InputSource;
@@ -79,16 +81,19 @@ public class HftpFileSystem extends File
     HttpURLConnection.setFollowRedirects(true);
   }
 
-  private static final int DEFAULT_PORT = 50470;
+  public static final int DEFAULT_PORT = 50470;
+  public static final Text TOKEN_KIND = new Text("HFTP delegation");
   
   protected InetSocketAddress nnAddr;
   protected UserGroupInformation ugi; 
   private String nnHttpUrl;
-  private URI hdfsURI;
+  private Text hdfsServiceName;
+  private URI hftpURI;
 
   public static final String HFTP_TIMEZONE = "UTC";
   public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
-  private Token<DelegationTokenIdentifier> delegationToken;
+  private Token<?> delegationToken;
+  private Token<?> renewToken;
   public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
 
   public static final SimpleDateFormat getDateFormat() {
@@ -111,10 +116,9 @@ public class HftpFileSystem extends File
 
   @Override
   public String getCanonicalServiceName() {
-    return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
+    return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void initialize(final URI name, final Configuration conf) 
   throws IOException {
@@ -123,14 +127,20 @@ public class HftpFileSystem extends File
     this.ugi = UserGroupInformation.getCurrentUser();
 
     nnAddr = NetUtils.createSocketAddr(name.toString());
-    StringBuilder sb = new StringBuilder("https://");
+    StringBuilder sb = new StringBuilder();
     sb.append(NetUtils.normalizeHostName(name.getHost()));
     sb.append(":");
     sb.append(conf.getInt("dfs.https.port", DEFAULT_PORT));
-    nnHttpUrl = sb.toString();
+    String tail = sb.toString();
+    nnHttpUrl = "https://" + tail;
     
+    try {
+      hftpURI = new URI("hftp://" + tail);
+    } catch (URISyntaxException ue) {
+      throw new IOException("bad uri for hdfs", ue);
+    }
     String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
-      SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
+        SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
     LOG.debug("Trying to find DT for " + name + " using key=" + key + 
         "; conf=" + conf.get(key, ""));
     String nnServiceName = conf.get(key);
@@ -139,42 +149,70 @@ public class HftpFileSystem extends File
       nnPort = NetUtils.createSocketAddr(nnServiceName, 
                                          NameNode.DEFAULT_PORT).getPort();
     }
-
     sb = new StringBuilder("hdfs://");
     sb.append(nnAddr.getHostName());
     sb.append(":");
     sb.append(nnPort);
     try {
-      hdfsURI = new URI(sb.toString());
+      URI hdfsURI = new URI(sb.toString());
+      hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI, 
+                                                                 nnPort));
     } catch (URISyntaxException ue) {
       throw new IOException("bad uri for hdfs", ue);
     }
-    
     if (UserGroupInformation.isSecurityEnabled()) {
-      
-      //try finding a token for this namenode (esp applicable for tasks
-      //using hftp). If there exists one, just set the delegationField
-      String canonicalName = getCanonicalServiceName();
+      String hftpServiceName = getCanonicalServiceName();
       for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
-        if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind())&
-            t.getService().toString().equals(canonicalName)) {
-          LOG.debug("Found existing DT for " + name);
-          delegationToken = (Token<DelegationTokenIdentifier>) t;
-          break;
+        Text kind = t.getKind();
+        if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)){
+          if (hdfsServiceName.equals(t.getService())) {
+            setDelegationToken(t);
+            break;
+          }
+        } else if (TOKEN_KIND.equals(kind)) {
+          if (hftpServiceName.equals(normalizeService(t.getService()
+                                       .toString()))) {
+            setDelegationToken(t);
+            break;
+          }
         }
       }
       
       //since we don't already have a token, go get one over https
       if (delegationToken == null) {
-        delegationToken = 
-          (Token<DelegationTokenIdentifier>) getDelegationToken(null);
+        setDelegationToken(getDelegationToken(null));
         renewer.addTokenToRenew(this);
+        LOG.debug("Created new DT for " + delegationToken.getService());
+      } else {
+        LOG.debug("Found existing DT for " + delegationToken.getService());        
       }
     }
   }
-  
+
+  private String normalizeService(String service) {
+    int colonIndex = service.indexOf(':');
+    if (colonIndex == -1) {
+      throw new IllegalArgumentException("Invalid service for hftp token: " + 
+                                         service);
+    }
+    String hostname = 
+        NetUtils.normalizeHostName(service.substring(0, colonIndex));
+    String port = service.substring(colonIndex + 1);
+    return hostname + ":" + port;
+  }
+
+  private <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+    renewToken = token;
+    // emulate the 203 usage of the tokens
+    // by setting the kind and service as if they were hdfs tokens
+    delegationToken = new Token<T>(token);
+    delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    delegationToken.setService(hdfsServiceName);
+  }
+
   @Override
-  public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
+  public synchronized Token<?> getDelegationToken(final String renewer
+                                                  ) throws IOException {
     try {
       //Renew TGT if needed
       ugi.checkTGTAndReloginFromKeytab();
@@ -193,7 +231,6 @@ public class HftpFileSystem extends File
           for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
             LOG.debug("Got dt for " + getUri() + ";t.service="
                       +t.getService());
-            t.setService(new Text(getCanonicalServiceName()));
             return t;
           }
           return null;
@@ -636,7 +673,8 @@ public class HftpFileSystem extends File
     @Override
     public int compareTo(Delayed o) {
       if (o.getClass() != RenewAction.class) {
-        throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
+        throw new IllegalArgumentException
+                  ("Illegal comparision to non-RenewAction");
       }
       RenewAction other = (RenewAction) o;
       return timestamp < other.timestamp ? -1 :
@@ -657,31 +695,20 @@ public class HftpFileSystem extends File
      * @return
      * @throws IOException
      */
-    @SuppressWarnings("unchecked")
     public boolean renew() throws IOException, InterruptedException {
       final HftpFileSystem fs = weakFs.get();
       if (fs != null) {
         synchronized (fs) {
-          fs.ugi.checkTGTAndReloginFromKeytab();
-          fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
-
-            @Override
-            public Void run() throws Exception {
-              try {
-                DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl, 
-                                                            fs.delegationToken);
-              } catch (IOException ie) {
-                try {
-                  fs.delegationToken = 
-                    (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
-                } catch (IOException ie2) {
-                  throw new IOException("Can't renew or get new delegation token ", 
-                                        ie);
-                }
-              }
-              return null;
-            } 
-          });
+          try {
+            fs.renewToken.renew(fs.getConf());
+          } catch (IOException ie) {
+            try {
+              fs.setDelegationToken(fs.getDelegationToken(null));
+            } catch (IOException ie2) {
+              throw new IOException("Can't renew or get new delegation token ", 
+                                    ie);
+            }
+          }
         }
       }
       return fs != null;
@@ -717,7 +744,7 @@ public class HftpFileSystem extends File
     }
 
     public void addTokenToRenew(HftpFileSystem fs) {
-      queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
+      queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(), fs));
     }
 
     public void run() {
@@ -747,4 +774,44 @@ public class HftpFileSystem extends File
   static {
     renewer.start();
   }
+  
+  @InterfaceAudience.Private
+  public static class TokenManager extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return kind.equals(TOKEN_KIND);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, 
+                      Configuration conf) throws IOException {
+      // update the kerberos credentials, if they are coming from a keytab
+      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+      // use https to renew the token
+      return 
+        DelegationTokenFetcher.renewDelegationToken
+        ("https://" + token.getService().toString(), 
+         (Token<DelegationTokenIdentifier>) token);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, 
+                       Configuration conf) throws IOException {
+      // update the kerberos credentials, if they are coming from a keytab
+      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+      // use https to cancel the token
+      DelegationTokenFetcher.cancelDelegationToken
+        ("https://" + token.getService().toString(), 
+         (Token<DelegationTokenIdentifier>) token);
+    }
+    
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Sep  9 22:07:35 2011
@@ -24,10 +24,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.EnumSet;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
 public class BlockTokenIdentifier extends TokenIdentifier {
@@ -182,4 +184,12 @@ public class BlockTokenIdentifier extend
     
     return cache;
   }
+  
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Fri Sep  9 22:07:35 2011
@@ -19,23 +19,24 @@ package org.apache.hadoop.hdfs.tools;
 
 import java.io.BufferedReader;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
@@ -46,71 +47,112 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.GenericOptionsParser;
 
 /**
  * Fetch a DelegationToken from the current Namenode and store it in the
  * specified file.
  */
 public class DelegationTokenFetcher {
-  private static final String USAGE =
-    "fetchdt retrieves delegation tokens (optionally over http)\n" +
-    "and writes them to specified file.\n" +
-    "Usage: fetchdt [--webservice <namenode http addr>] <output filename>";
   
-  private final DistributedFileSystem dfs;
-  private final UserGroupInformation ugi;
-  private final DataOutputStream out;
-  private final Configuration conf;
   private static final Log LOG = 
     LogFactory.getLog(DelegationTokenFetcher.class);
+  private static final String WEBSERVICE = "webservice";
+  private static final String CANCEL = "cancel";
+  private static final String RENEW = "renew";
   
   static {
     // Enable Kerberos sockets
     System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
   }
 
+  private static void printUsage(PrintStream err) throws IOException {
+    err.println("fetchdt retrieves delegation tokens from the NameNode");
+    err.println();
+    err.println("fetchdt <opts> <token file>");
+    err.println("Options:");
+    err.println("  --webservice <url>  Url to contact NN on");
+    err.println("  --cancel            Cancel the delegation token");
+    err.println("  --renew             Renew the delegation token");
+    err.println();
+    GenericOptionsParser.printGenericCommandUsage(err);
+    System.exit(1);
+  }
+
+  private static Collection<Token<?>>
+    readTokens(Path file, Configuration conf) throws IOException{
+    Credentials creds = Credentials.readTokenStorageFile(file, conf);
+    return creds.getAllTokens();
+  }
+
   /**
    * Command-line interface
    */
   public static void main(final String [] args) throws Exception {
+    final Configuration conf = new Configuration();
+    Options fetcherOptions = new Options();
+    fetcherOptions.addOption(WEBSERVICE, true, 
+                             "HTTPS url to reach the NameNode at");
+    fetcherOptions.addOption(CANCEL, false, "cancel the token");
+    fetcherOptions.addOption(RENEW, false, "renew the token");
+    GenericOptionsParser parser =
+      new GenericOptionsParser(conf, fetcherOptions, args);
+    CommandLine cmd = parser.getCommandLine();
+    
+    // get options
+    final String webUrl =
+      cmd.hasOption(WEBSERVICE) ? cmd.getOptionValue(WEBSERVICE) : null;
+    final boolean cancel = cmd.hasOption(CANCEL);
+    final boolean renew = cmd.hasOption(RENEW);
+    String[] remaining = parser.getRemainingArgs();
+    
+    // check option validity
+    if (cancel && renew) {
+      System.err.println("ERROR: Only specify cancel or renew.");
+      printUsage(System.err);
+    }
+    if (remaining.length != 1 || remaining[0].charAt(0) == '-') {
+      System.err.println("ERROR: Must specify exacltly one token file");
+      printUsage(System.err);
+    }
+    // default to using the local file system
+    FileSystem local = FileSystem.getLocal(conf);
+    final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
+
     // Login the current user
-    UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Object>() {
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {
         
-        if(args.length == 3 && "--webservice".equals(args[0])) {
-          getDTfromRemoteIntoFile(args[1], args[2]);
-          return null;
-        }
-        // avoid annoying mistake
-        if(args.length == 1 && "--webservice".equals(args[0])) {
-          System.out.println(USAGE);
-          return null;
-        }
-        if(args.length != 1 || args[0].isEmpty()) {
-          System.out.println(USAGE);
-          return null;
-        }
-        
-        DataOutputStream out = null;
-        
-        try {
-          Configuration conf = new Configuration();
-          DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(conf);
-          out = new DataOutputStream(new FileOutputStream(args[0]));
-          UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-          new DelegationTokenFetcher(dfs, out, ugi, conf).go();
-          
-          out.flush();
-          System.out.println("Succesfully wrote token of size " + 
-              out.size() + " bytes to "+ args[0]);
-        } catch (IOException ioe) {
-          System.out.println("Exception encountered:\n" +
-              StringUtils.stringifyException(ioe));
-        } finally {
-          if(out != null) out.close();
+        if (cancel) {
+          for(Token<?> token: readTokens(tokenFile, conf)) {
+            if (token.isManaged()) {
+              token.cancel(conf);
+              System.out.println("Cancelled token for " + token.getService());
+            }
+          }          
+        } else if (renew) {
+          for(Token<?> token: readTokens(tokenFile, conf)) {
+            if (token.isManaged()) {
+              token.renew(conf);
+              System.out.println("Renewed token for " + token.getService());
+            }
+          }          
+        } else {
+          if (webUrl != null) {
+            getDTfromRemote(webUrl, null).
+              writeTokenStorageFile(tokenFile, conf);
+            System.out.println("Fetched token via http for " + webUrl);
+          } else {
+            FileSystem fs = FileSystem.get(conf);
+            Token<?> token = fs.getDelegationToken(ugi.getShortUserName());
+            Credentials cred = new Credentials();
+            cred.addToken(token.getService(), token);
+            cred.writeTokenStorageFile(tokenFile, conf);
+            System.out.println("Fetched token for " + fs.getUri() + " into " +
+                               tokenFile);
+          }        
         }
         return null;
       }
@@ -118,35 +160,6 @@ public class DelegationTokenFetcher {
 
   }
   
-  public DelegationTokenFetcher(DistributedFileSystem dfs, 
-      DataOutputStream out, UserGroupInformation ugi, Configuration conf) {
-    checkNotNull("dfs", dfs); this.dfs = dfs;
-    checkNotNull("out", out); this.out = out;
-    checkNotNull("ugi", ugi); this.ugi = ugi;
-    checkNotNull("conf",conf); this.conf = conf;
-  }
-  
-  private void checkNotNull(String s, Object o) {
-    if(o == null) throw new IllegalArgumentException(s + " cannot be null.");
-  }
-
-  public void go() throws IOException {
-    String fullName = ugi.getUserName();
-    String shortName = ugi.getShortUserName();
-    Token<DelegationTokenIdentifier> token = 
-      dfs.getDelegationToken(new Text(fullName));
-    
-    // Reconstruct the ip:port of the Namenode
-    URI uri = FileSystem.getDefaultUri(conf);
-    String nnAddress = 
-      InetAddress.getByName(uri.getHost()).getHostAddress() + ":" + uri.getPort();
-    token.setService(new Text(nnAddress));
-    
-    Credentials ts = new Credentials();
-    ts.addToken(new Text(shortName), token);
-    ts.writeTokenStorageToStream(out);
-  }
-
   /**
    * Utility method to obtain a delegation token over http
    * @param nnHttpAddr Namenode http addr, such as http://namenode:50070
@@ -172,6 +185,12 @@ public class DelegationTokenFetcher {
       Credentials ts = new Credentials();
       dis = new DataInputStream(in);
       ts.readFields(dis);
+      for(Token<?> token: ts.getAllTokens()) {
+        token.setKind(HftpFileSystem.TOKEN_KIND);
+        token.setService(new Text(SecurityUtil.buildDTServiceName
+                                   (remoteURL.toURI(), 
+                                    HftpFileSystem.DEFAULT_PORT)));
+      }
       return ts;
     } catch (Exception e) {
       throw new IOException("Unable to obtain remote token", e);
@@ -278,20 +297,4 @@ public class DelegationTokenFetcher {
       throw ie;
     }
   }
-
-  /**
-   * Utility method to obtain a delegation token over http
-   * @param nnHttpAddr Namenode http addr, such as http://namenode:50070
-   * @param filename Name of file to store token in
-   */
-  static private void getDTfromRemoteIntoFile(String nnAddr, String filename) 
-  throws IOException {
-    Credentials ts = getDTfromRemote(nnAddr, null); 
-
-    DataOutputStream file = new DataOutputStream(new FileOutputStream(filename));
-    ts.writeTokenStorageToStream(file);
-    file.flush();
-    System.out.println("Successfully wrote token of " + file.size() 
-        + " bytes  to " + filename);
-  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Sep  9 22:07:35 2011
@@ -43,6 +43,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -76,6 +77,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -471,7 +473,7 @@ public class JobClient extends Configure
     }        
   }
 
-  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+  private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
     return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
         JobSubmissionProtocol.versionID, addr, 
@@ -479,6 +481,41 @@ public class JobClient extends Configure
         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
   }
 
+  @InterfaceAudience.Private
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf
+                      ) throws IOException, InterruptedException {
+      InetSocketAddress addr = 
+          NetUtils.createSocketAddr(token.getService().toString());
+      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf
+                       ) throws IOException, InterruptedException {
+      InetSocketAddress addr = 
+          NetUtils.createSocketAddr(token.getService().toString());
+      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+    
+  }
+
   /**
    * Build a job client, connect to the indicated job tracker.
    * 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Sep  9 22:07:35 2011
@@ -19,10 +19,6 @@
 package org.apache.hadoop.mapreduce.security.token;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
@@ -38,17 +34,10 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VersionMismatchException;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
 
 
@@ -62,14 +51,14 @@ public class DelegationTokenRenewal {
    *
    */
   private static class DelegationTokenToRenew {
-    public final Token<DelegationTokenIdentifier> token;
+    public final Token<?> token;
     public final JobID jobId;
     public final Configuration conf;
     public long expirationDate;
     public TimerTask timerTask;
     
     public DelegationTokenToRenew(
-        JobID jId, Token<DelegationTokenIdentifier> t, 
+        JobID jId, Token<?> t, 
         Configuration newConf, long newExpirationDate) {
       token = t;
       jobId = jId;
@@ -117,10 +106,9 @@ public class DelegationTokenRenewal {
   
   private static class DelegationTokenCancelThread extends Thread {
     private static class TokenWithConf {
-      Token<DelegationTokenIdentifier> token;
+      Token<?> token;
       Configuration conf;
-      TokenWithConf(Token<DelegationTokenIdentifier> token,  
-          Configuration conf) {
+      TokenWithConf(Token<?> token, Configuration conf) {
         this.token = token;
         this.conf = conf;
       }
@@ -132,7 +120,7 @@ public class DelegationTokenRenewal {
       super("Delegation Token Canceler");
       setDaemon(true);
     }
-    public void cancelToken(Token<DelegationTokenIdentifier> token,  
+    public void cancelToken(Token<?> token,  
         Configuration conf) {
       TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
       while (!queue.offer(tokenWithConf)) {
@@ -147,17 +135,24 @@ public class DelegationTokenRenewal {
     }
 
     public void run() {
+      TokenWithConf tokenWithConf = null;
       while (true) {
-        TokenWithConf tokenWithConf = null;
         try {
           tokenWithConf = queue.take();
-          DistributedFileSystem dfs = getDFSForToken(tokenWithConf.token,  
-              tokenWithConf.conf);
+          final TokenWithConf current = tokenWithConf;
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Canceling token " + tokenWithConf.token.getService() +  
-                " for dfs=" + dfs);
+            LOG.debug("Canceling token " + tokenWithConf.token.getService());
           }
-          dfs.cancelDelegationToken(tokenWithConf.token);
+          // need to use doAs so that http can find the kerberos tgt
+          UserGroupInformation.getLoginUser()
+            .doAs(new PrivilegedExceptionAction<Void>(){
+
+              @Override
+              public Void run() throws Exception {
+                current.token.cancel(current.conf);
+                return null;
+              }
+            });
         } catch (IOException e) {
           LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
               StringUtils.stringifyException(e));
@@ -176,90 +171,28 @@ public class DelegationTokenRenewal {
     delegationTokens.add(t);
   }
   
-  // kind of tokens we currently renew
-  private static final Text kindHdfs = 
-    DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
-  
-  @SuppressWarnings("unchecked")
   public static synchronized void registerDelegationTokensForRenewal(
-      JobID jobId, Credentials ts, Configuration conf) {
+      JobID jobId, Credentials ts, Configuration conf) throws IOException {
     if(ts==null)
       return; //nothing to add
     
-    Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
+    Collection <Token<?>> tokens = ts.getAllTokens();
     long now = System.currentTimeMillis();
     
-    for(Token<? extends TokenIdentifier> t : tokens) {
-      // currently we only check for HDFS delegation tokens
-      // later we can add more different types.
-      if(! t.getKind().equals(kindHdfs)) {
-        continue; 
-      }
-      Token<DelegationTokenIdentifier> dt = 
-        (Token<DelegationTokenIdentifier>)t;
-      
+    for(Token<?> t : tokens) {
       // first renew happens immediately
-      DelegationTokenToRenew dtr = 
-        new DelegationTokenToRenew(jobId, dt, conf, now); 
+      if (t.isManaged()) {
+        DelegationTokenToRenew dtr = 
+          new DelegationTokenToRenew(jobId, t, conf, now); 
 
-      addTokenToList(dtr);
+        addTokenToList(dtr);
       
-      setTimerForTokenRenewal(dtr, true);
-      LOG.info("registering token for renewal for service =" + dt.getService()+
-          " and jobID = " + jobId);
-    }
-  }
-  
-  protected static long renewDelegationTokenOverHttps(
-      final Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws InterruptedException, IOException{
-    final String httpAddress = getHttpAddressForToken(token, conf);
-    
-    Long expDate = (Long) UserGroupInformation.getLoginUser().doAs(
-        new PrivilegedExceptionAction<Long>() {
-      public Long run() throws IOException {
-        return DelegationTokenFetcher.renewDelegationToken(httpAddress, token);  
-      }
-    });
-    LOG.info("Renew over HTTP done. addr="+httpAddress+";res="+expDate);
-    return expDate;
-  }
-  
-  private static long renewDelegationToken(DelegationTokenToRenew dttr) 
-  throws Exception {
-    long newExpirationDate=System.currentTimeMillis()+3600*1000;
-    Token<DelegationTokenIdentifier> token = dttr.token;
-    Configuration conf = dttr.conf;
-    
-    if(token.getKind().equals(kindHdfs)) {
-      try {
-        try {
-          DistributedFileSystem dfs = getDFSForToken(token, conf);
-          newExpirationDate = dfs.renewDelegationToken(token);
-        } catch(VersionMismatchException vme) {
-          // if there is a version mismatch we try over https
-          LOG.info("Delegation token renew for t=" + token.getService() +
-              " failed with VersionMissmaptch:" + vme.toString()+". Trying over https");
-          renewDelegationTokenOverHttps(token, conf);
-        }
-      }catch (InvalidToken ite) {
-        LOG.warn("invalid token - not scheduling for renew: " + ite.getLocalizedMessage());
-        removeFailedDelegationToken(dttr);
-        throw new IOException("failed to renew token", ite);
-      } catch (AccessControlException ioe) {
-        LOG.warn("failed to renew token:"+token, ioe);
-        removeFailedDelegationToken(dttr);
-        throw new IOException("failed to renew token", ioe);
-      } catch (Exception e) {
-        LOG.warn("failed to renew token:"+token, e);
-        // returns default expiration date
+        setTimerForTokenRenewal(dtr, true);
+        LOG.info("registering token for renewal for service =" + t.getService()+
+                 " and jobID = " + jobId);
       }
-    } else {
-      throw new Exception("unknown token type to renew: "+token.getKind());
     }
-    return newExpirationDate;
   }
-
   
   /**
    * Task - to renew a token
@@ -272,74 +205,36 @@ public class DelegationTokenRenewal {
     
     @Override
     public void run() {
-      Token<DelegationTokenIdentifier> token = dttr.token;
-      long newExpirationDate=0;
+      Token<?> token = dttr.token;
       try {
-        newExpirationDate = renewDelegationToken(dttr);
+        // need to use doAs so that http can find the kerberos tgt
+        dttr.expirationDate = UserGroupInformation.getLoginUser()
+          .doAs(new PrivilegedExceptionAction<Long>(){
+
+          @Override
+          public Long run() throws Exception {
+            return dttr.token.renew(dttr.conf);
+          }
+        });
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("renewing for:" + token.getService() + ";newED=" + 
+                    dttr.expirationDate);
+        }
+        setTimerForTokenRenewal(dttr, false);// set the next one
       } catch (Exception e) {
-        return; // message logged in renewDT method
+        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
+        removeFailedDelegationToken(dttr);
       }
-      if (LOG.isDebugEnabled())
-        LOG.debug("renewing for:"+token.getService()+";newED=" + 
-            newExpirationDate);
-      
-      // new expiration date
-      dttr.expirationDate = newExpirationDate;
-      setTimerForTokenRenewal(dttr, false);// set the next one
     }
   }
   
-  private static String getHttpAddressForToken(
-      Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws IOException {
-    
-    String[] ipaddr = token.getService().toString().split(":");
-    
-    InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
-    String dnsName = iaddr.getCanonicalHostName();
-    String httpsPort = conf.get("dfs.https.port", "50470");
-    
-    // always use https (it is for security only)
-    return "https://" + dnsName+":"+httpsPort;
-  }
-    
-  private static DistributedFileSystem getDFSForToken(
-      Token<DelegationTokenIdentifier> token, final Configuration conf) 
-  throws Exception {
-    DistributedFileSystem dfs = null;
-    try {
-      //TODO: The service is usually an IPaddress:port. We convert
-      //it to dns name and then obtain the filesystem just so that
-      //we reuse the existing filesystem handle (that the jobtracker
-      //might have for this namenode; the namenode is usually
-      //specified as the dns name in the jobtracker).
-      //THIS IS A WORKAROUND FOR NOW. NEED TO SOLVE THIS PROBLEM 
-      //IN A BETTER WAY.
-      String[] ipaddr = token.getService().toString().split(":");
-      InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
-      String dnsName = iaddr.getCanonicalHostName();
-      final URI uri = new URI (SCHEME + "://" + dnsName+":"+ipaddr[1]);
-      dfs = (DistributedFileSystem)
-      UserGroupInformation.getLoginUser().doAs(
-          new PrivilegedExceptionAction<DistributedFileSystem>() {
-        public DistributedFileSystem run() throws IOException {
-          return (DistributedFileSystem) FileSystem.get(uri, conf);  
-        }
-      });
-
-      
-    } catch (Exception e) {
-      LOG.warn("Failed to create a dfs to renew for:" + token.getService(), e);
-      throw e;
-    } 
-    return dfs;
-  }
-  
   /**
    * set task to renew the token
    */
-  private static void setTimerForTokenRenewal(
-      DelegationTokenToRenew token, boolean firstTime) {
+  private static 
+  void setTimerForTokenRenewal(DelegationTokenToRenew token, 
+                               boolean firstTime) throws IOException {
       
     // calculate timer time
     long now = System.currentTimeMillis();
@@ -351,15 +246,11 @@ public class DelegationTokenRenewal {
       renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration
     }
     
-    try {
-      // need to create new task every time
-      TimerTask tTask = new RenewalTimerTask(token);
-      token.setTimerTask(tTask); // keep reference to the timer
-
-      renewalTimer.schedule(token.timerTask, new Date(renewIn));
-    } catch (Exception e) {
-      LOG.warn("failed to schedule a task, token will not renew more", e);
-    }
+    // need to create new task every time
+    TimerTask tTask = new RenewalTimerTask(token);
+    token.setTimerTask(tTask); // keep reference to the timer
+
+    renewalTimer.schedule(token.timerTask, new Date(renewIn));
   }
 
   /**
@@ -372,12 +263,7 @@ public class DelegationTokenRenewal {
   
   // cancel a token
   private static void cancelToken(DelegationTokenToRenew t) {
-    Token<DelegationTokenIdentifier> token = t.token;
-    Configuration conf = t.conf;
-    
-    if(token.getKind().equals(kindHdfs)) {
-      dtCancelThread.cancelToken(token, conf);
-    }
+    dtCancelThread.cancelToken(t.token, t.conf);
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Fri Sep  9 22:07:35 2011
@@ -22,7 +22,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -31,7 +33,7 @@ import org.apache.hadoop.security.UserGr
  */
 public class JobTokenIdentifier extends TokenIdentifier {
   private Text jobid;
-  final static Text KIND_NAME = new Text("mapreduce.job");
+  public final static Text KIND_NAME = new Text("mapreduce.job");
   
   /**
    * Default constructor
@@ -82,4 +84,12 @@ public class JobTokenIdentifier extends 
   public void write(DataOutput out) throws IOException {
     jobid.write(out);
   }
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java Fri Sep  9 22:07:35 2011
@@ -28,8 +28,9 @@ import org.apache.hadoop.security.token.
 //@InterfaceAudience.Private
 public class DelegationTokenIdentifier 
   extends AbstractDelegationTokenIdentifier {
-static final Text MAPREDUCE_DELEGATION_KIND = 
-  new Text("MAPREDUCE_DELEGATION_TOKEN");
+  
+  public static final Text MAPREDUCE_DELEGATION_KIND = 
+    new Text("MAPREDUCE_DELEGATION_TOKEN");
 
 /**
  * Create an empty delegation token identifier for reading into.

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java Fri Sep  9 22:07:35 2011
@@ -17,42 +17,41 @@
  */
 package org.apache.hadoop.tools;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestDelegationTokenFetcher {
   private DistributedFileSystem dfs;
-  private DataOutputStream out;
-  private UserGroupInformation ugi;
   private Configuration conf;
+  private URI uri;
+  private static final String SERVICE_VALUE = "localhost:2005";
 
   @Before 
-  public void init() {
+  public void init() throws URISyntaxException, IOException {
     dfs = mock(DistributedFileSystem.class);
-    out = mock(DataOutputStream.class);
-    ugi = mock(UserGroupInformation.class);
     conf = new Configuration();
+    uri = new URI("hdfs://" + SERVICE_VALUE);
+    FileSystem.addFileSystemForTesting(uri, conf, dfs);
   }
   
   /**
@@ -61,68 +60,36 @@ public class TestDelegationTokenFetcher 
    */
   @Test
   public void expectedTokenIsRetrievedFromDFS() throws Exception {
-    final String LONG_NAME = "TheDoctor@TARDIS";
-    final String SHORT_NAME = "TheDoctor";
-    final String SERVICE_VALUE = "localhost:2005";
-    URI uri = new URI("hdfs://" + SERVICE_VALUE);
-    FileSystem.setDefaultUri(conf, uri);
-    
-    // Mock out the user's long and short names.
-    when(ugi.getUserName()).thenReturn(LONG_NAME);
-    when(ugi.getShortUserName()).thenReturn(SHORT_NAME);
-    
+    final byte[] ident = new byte[]{1,2,3,4};
+    final byte[] pw = new byte[]{42};
+    final Text kind = new Text("MY-KIND");
+    final Text service = new Text(uri.toString());
+
     // Create a token for the fetcher to fetch, wire NN to return it when asked
     // for this particular user.
-    Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
-    when(dfs.getDelegationToken(eq(new Text(LONG_NAME)))).thenReturn(t);
- 
-    // Now, actually let the TokenFetcher go fetch the token.
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    out = new DataOutputStream(baos);
-    new DelegationTokenFetcher(dfs, out, ugi, conf).go();
-    
-    // now read the data back in and verify correct values
-    Credentials ts = new Credentials();
-    DataInputStream dis = 
-      new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    Token<DelegationTokenIdentifier> t = 
+      new Token<DelegationTokenIdentifier>(ident, pw, kind, service);
+    when(dfs.getDelegationToken((String) null)).thenReturn(t);
+    when(dfs.renewDelegationToken(eq(t))).thenReturn(1000L);
     
-    ts.readTokenStorageStream(dis);
-    Token<? extends TokenIdentifier> newToken = ts.getToken(new Text(SHORT_NAME));
-    
-    assertEquals("Should only be one token in storage", ts.numberOfTokens(), 1);
-    assertEquals("Service value should have survived", 
-        "127.0.0.1:2005", newToken.getService().toString());
+    DelegationTokenFetcher.main(new String[]{"-fs", uri.toString(), 
+                                             "file.dta"});
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path p = new Path(fs.getWorkingDirectory(), "file.dta");
+    Credentials creds = Credentials.readTokenStorageFile(p , conf);
+    Iterator<Token<?>> itr = creds.getAllTokens().iterator();
+    // make sure we got back exactly the 1 token we expected
+    assertTrue(itr.hasNext());
+    assertEquals(t, itr.next());
+    assertTrue(!itr.hasNext());
+
+    DelegationTokenFetcher.main(new String[]{"-fs", uri.toString(), 
+                                             "--renew", "file.dta"});
+
+    DelegationTokenFetcher.main(new String[]{"-fs", uri.toString(), 
+                                             "--cancel", "file.dta"});
+    verify(dfs).renewDelegationToken(eq(t));
+    verify(dfs).cancelDelegationToken(eq(t));
   }
 
-  private void checkWithNullParam(String s) {
-    try {
-      new DelegationTokenFetcher(dfs, out, ugi, conf);
-    } catch (IllegalArgumentException iae) {
-      assertEquals("Expected exception message not received", 
-          s + " cannot be null.", iae.getMessage());
-      return; // received expected exception. We're good.
-    }
-    fail("null parameter should have failed.");
-  }
-  
-  @Test
-  public void dfsCannotBeNull() {
-    dfs = null;
-    String s = "dfs";
-    checkWithNullParam(s);
-  }
-
-  @Test
-  public void dosCannotBeNull() {
-    out = null;
-    String s = "out";
-    checkWithNullParam(s);
-  }
-  
-  @Test
-  public void ugiCannotBeNull() {
-    ugi = null;
-    String s = "ugi";
-    checkWithNullParam(s);
-  }
 }