You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:19:59 UTC

svn commit: r1077487 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/fs/ core/org/apache/hadoop/security/ hdfs/org/apache/hadoop/hdfs/ hdfs/org/apache/hadoop/hdfs/server/namenode/ hdfs/org/apache/hadoop/hdfs/tools/...

Author: omalley
Date: Fri Mar  4 04:19:59 2011
New Revision: 1077487

URL: http://svn.apache.org/viewvc?rev=1077487&view=rev
Log:
commit 767074ea6d22994f05496f4f58d49fd33799c7f1
Author: Devaraj Das <dd...@yahoo-inc.com>
Date:   Thu Jun 3 14:45:29 2010 -0700

    HDFS:1007 from https://issues.apache.org/jira/secure/attachment/12446280/hdfs-1007-long-running-hftp-client.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HDFS-1007. makes long running servers using hftp work. Also has some
    +    refactoring in the MR code to do with handling of delegation tokens.
    +    (oom & ddas)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java Fri Mar  4 04:19:59 2011
@@ -22,7 +22,7 @@ import java.io.*;
 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
  * and buffers input through a {@link BufferedInputStream}. */
 public class FSDataInputStream extends DataInputStream
-    implements Seekable, PositionedReadable {
+    implements Seekable, PositionedReadable, Closeable {
 
   public FSDataInputStream(InputStream in)
     throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java Fri Mar  4 04:19:59 2011
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.security.PrivilegedExceptionAction;
@@ -38,11 +37,12 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 /****************************************************************
  * An abstract base class for a fairly generic filesystem.  It
@@ -145,6 +145,22 @@ public abstract class FileSystem extends
   /** Returns a URI whose scheme and authority identify this FileSystem.*/
   public abstract URI getUri();
   
+  /**
+   * Get the default port for this file system.
+   * @return the default port or 0 if there isn't one
+   */
+  protected int getDefaultPort() {
+    return 0;
+  }
+
+  /**
+   * Get a canonical name for this file system.
+   * @return a URI string that uniquely identifies this file system
+   */
+  public String getCanonicalServiceName() {
+    return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
+  }
+  
   /** @deprecated call #getUri() instead.*/
   public String getName() { return getUri().toString(); }
 
@@ -1112,6 +1128,15 @@ public abstract class FileSystem extends
       .makeQualified(this);
   }
 
+  /**
+   * Get a new delegation token for this file system.
+   * @param renewer the account name that is allowed to renew the token.
+   * @return a new delegation token
+   * @throws IOException
+   */
+  public Token<?> getDelegationToken(String renewer) throws IOException {
+    return null;
+  }
 
   /**
    * Set the current working directory for the given file system. All relative

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java Fri Mar  4 04:19:59 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -119,15 +120,19 @@ public class Credentials implements Writ
    * @param conf
    * @throws IOException
    */
-  public void readTokenStorageFile(Path filename, 
-                                   Configuration conf) throws IOException {
-    FSDataInputStream in = filename.getFileSystem(conf).open(filename);
+  public static Credentials readTokenStorageFile(Path filename, 
+                                                 Configuration conf
+                                                 ) throws IOException {
+    FSDataInputStream in = null;
+    Credentials credentials = new Credentials();
     try {
-    readTokenStorageStream(in);
+      in = filename.getFileSystem(conf).open(filename);
+      credentials.readTokenStorageStream(in);
+      in.close();
+      return credentials;
     } catch(IOException ioe) {
+      IOUtils.cleanup(LOG, in);
       throw new IOException("Exception reading " + filename, ioe);
-    } finally {
-      in.close();
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar  4 04:19:59 2011
@@ -460,8 +460,8 @@ public class UserGroupInformation {
         if (fileLocation != null && isSecurityEnabled()) {
           // load the token storage file and put all of the tokens into the
           // user.
-          Credentials cred = new Credentials();
-          cred.readTokenStorageFile(new Path("file:///" + fileLocation), conf);
+          Credentials cred = Credentials.readTokenStorageFile(
+              new Path("file:///" + fileLocation), conf);
           for (Token<?> token: cred.getAllTokens()) {
             loginUser.addToken(token);
           }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 04:19:59 2011
@@ -265,14 +265,34 @@ public class DFSClient implements FSCons
     }
   }
 
+  /** A test method for printing out tokens */
+  public static String stringifyToken(Token<DelegationTokenIdentifier> token
+                                      ) throws IOException {
+    DelegationTokenIdentifier ident = new DelegationTokenIdentifier();
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);  
+    ident.readFields(in);
+    String str = ident.getKind() + " token " + ident.getSequenceNumber() + 
+                 " for " + ident.getUser().getShortUserName();
+    if (token.getService().getLength() > 0) {
+      return (str + " on " + token.getService());
+    } else {
+      return str;
+    }
+  }
+
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
-    return namenode.getDelegationToken(renewer);
+    Token<DelegationTokenIdentifier> result =
+      namenode.getDelegationToken(renewer);
+    LOG.info("Created " + stringifyToken(result));
+    return result;
   }
 
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     try {
+      LOG.info("Renewing " + stringifyToken(token));
       return namenode.renewDelegationToken(token);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(InvalidToken.class,
@@ -283,6 +303,7 @@ public class DFSClient implements FSCons
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     try {
+      LOG.info("Cancelling " + stringifyToken(token));
       namenode.cancelDelegationToken(token);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(InvalidToken.class,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Mar  4 04:19:59 2011
@@ -537,7 +537,22 @@ public class DistributedFileSystem exten
       ) throws IOException {
     dfs.setTimes(getPathName(p), mtime, atime);
   }
-  
+
+  @Override
+  protected int getDefaultPort() {
+    return NameNode.DEFAULT_PORT;
+  }
+
+  @Override
+  public 
+  Token<DelegationTokenIdentifier> getDelegationToken(String renewer
+                                                      ) throws IOException {
+    Token<DelegationTokenIdentifier> result =
+      dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
+    result.setService(new Text(getCanonicalServiceName()));
+    return result;
+  }
+
   /** 
    * Delegation Token Operations
    * These are DFS only operations.
@@ -549,7 +564,9 @@ public class DistributedFileSystem exten
    * @param renewer Name of the designated renewer for the token
    * @return Token<DelegationTokenIdentifier>
    * @throws IOException
+   * @Deprecated use {@link #getDelegationToken(String)}
    */
+  @Deprecated
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
     return dfs.getDelegationToken(renewer);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Mar  4 04:19:59 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.ref.WeakReference;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -33,6 +34,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Random;
 import java.util.TimeZone;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -45,6 +49,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
@@ -75,13 +80,16 @@ public class HftpFileSystem extends File
     HttpURLConnection.setFollowRedirects(true);
   }
 
+  private static final int DEFAULT_PORT = 50470;
+  
   protected InetSocketAddress nnAddr;
   protected UserGroupInformation ugi; 
+  private String nnHttpUrl;
+  private URI hdfsURI;
 
   public static final String HFTP_TIMEZONE = "UTC";
   public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
-  private Token<? extends TokenIdentifier> delegationToken;
-  public static final String HFTP_RENEWER = "fs.hftp.renewer";
+  private Token<DelegationTokenIdentifier> delegationToken;
   public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
 
   public static final SimpleDateFormat getDateFormat() {
@@ -98,6 +106,17 @@ public class HftpFileSystem extends File
     };
 
   @Override
+  protected int getDefaultPort() {
+    return DEFAULT_PORT;
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
   public void initialize(final URI name, final Configuration conf) 
   throws IOException {
     super.initialize(name, conf);
@@ -105,66 +124,82 @@ public class HftpFileSystem extends File
     this.ugi = UserGroupInformation.getCurrentUser();
 
     nnAddr = NetUtils.createSocketAddr(name.toString());
+    StringBuilder sb = new StringBuilder("https://");
+    sb.append(NetUtils.normalizeHostName(name.getHost()));
+    sb.append(":");
+    sb.append(conf.getInt("dfs.https.port", DEFAULT_PORT));
+    nnHttpUrl = sb.toString();
+    
+    String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
+      SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
+    LOG.debug("Trying to find DT for " + name + " using key=" + key + 
+        "; conf=" + conf.get(key, ""));
+    String nnServiceName = conf.get(key);
+    int nnPort = NameNode.DEFAULT_PORT;
+    if (nnServiceName != null) {
+      nnPort = NetUtils.createSocketAddr(nnServiceName, 
+                                         NameNode.DEFAULT_PORT).getPort();
+    }
+
+    sb = new StringBuilder("hdfs://");
+    sb.append(nnAddr.getHostName());
+    sb.append(":");
+    sb.append(nnPort);
+    try {
+      hdfsURI = new URI(sb.toString());
+    } catch (URISyntaxException ue) {
+      throw new IOException("bad uri for hdfs", ue);
+    }
     
     if (UserGroupInformation.isSecurityEnabled()) {
-      // configuration has the actual service name for this url. Build the key 
-      // and get it.
-      final String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
-      SecurityUtil.buildDTServiceName(name, NameNode.DEFAULT_PORT);
-
-      LOG.debug("Trying to find DT for " + name + " using key=" + key + 
-          "; conf=" + conf.get(key, ""));
-      Text nnServiceNameText = new Text(conf.get(key, ""));
       
-      Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
       //try finding a token for this namenode (esp applicable for tasks
       //using hftp). If there exists one, just set the delegationField
-      for (Token<? extends TokenIdentifier> t : tokens) {
-        if ((t.getService()).equals(nnServiceNameText)) {
+      String canonicalName = getCanonicalServiceName();
+      for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
+        if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind())&
+            t.getService().toString().equals(canonicalName)) {
           LOG.debug("Found existing DT for " + name);
-          delegationToken = t;
-          return;
+          delegationToken = (Token<DelegationTokenIdentifier>) t;
+          break;
         }
       }
       //since we don't already have a token, go get one over https
-      try {
-        ugi.doAs(new PrivilegedExceptionAction<Object>() {
-          public Object run() throws IOException {
-            StringBuffer sb = new StringBuffer();
-            //try https (on http we NEVER get a delegation token)
-            String nnHttpUrl = "https://" + 
-            (sb.append(NetUtils.normalizeHostName(name.getHost()))
-                .append(":").append(conf.getInt("dfs.https.port", 50470))).
-                toString();
-            Credentials c;
-            try {
-              c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, 
-                  conf.get(HFTP_RENEWER));
-            } catch (Exception e) {
-              LOG.info("Couldn't get a delegation token from " + nnHttpUrl + 
-              " using https.");
-              LOG.debug("error was ", e);
-              //Maybe the server is in unsecure mode (that's bad but okay)
-              return null;
-            }
-            for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
-              //the service field is already set and so setService 
-              //is not required
-              delegationToken = t;
-              LOG.debug("Got dt for " + getUri() + ";t.service="
-                  +t.getService());
-            }
-            return null;
-          }
-        });
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+      if (delegationToken == null) {
+        delegationToken = 
+          (Token<DelegationTokenIdentifier>) getDelegationToken(null);
+        renewer.addTokenToRenew(this);
       }
     }
   }
   
-  public Token<? extends TokenIdentifier> getDelegationToken() {
-    return delegationToken;
+  @Override
+  public Token<?> getDelegationToken(final String renewer) throws IOException {
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+        public Token<?> run() throws IOException {
+          Credentials c;
+          try {
+            c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
+          } catch (Exception e) {
+            LOG.info("Couldn't get a delegation token from " + nnHttpUrl + 
+            " using https.");
+            LOG.debug("error was ", e);
+            //Maybe the server is in unsecure mode (that's bad but okay)
+            return null;
+          }
+          for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
+            LOG.debug("Got dt for " + getUri() + ";t.service="
+                      +t.getService());
+            t.setService(new Text(getCanonicalServiceName()));
+            return t;
+          }
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -214,10 +249,12 @@ public class HftpFileSystem extends File
   protected String updateQuery(String query) throws IOException {
     String tokenString = null;
     if (UserGroupInformation.isSecurityEnabled()) {
-      if (delegationToken != null) {
-        tokenString = delegationToken.encodeToUrlString();
-        return (query + JspHelper.SET_DELEGATION + tokenString);
-      } // else we are talking to an unsecure cluster
+      synchronized (this) {
+        if (delegationToken != null) {
+          tokenString = delegationToken.encodeToUrlString();
+          return (query + JspHelper.SET_DELEGATION + tokenString);
+        } // else we are talking to an insecure cluster
+      }
     }
     return query;
   }
@@ -537,4 +574,145 @@ public class HftpFileSystem extends File
     final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
     return cs != null? cs: super.getContentSummary(f);
   }
+  
+  /**
+   * An action that will renew and replace the hftp file system's delegation 
+   * tokens automatically.
+   */
+  private static class RenewAction implements Delayed {
+    // when should the renew happen
+    private long timestamp;
+    // a weak reference to the file system so that it can be garbage collected
+    private final WeakReference<HftpFileSystem> weakFs;
+
+    RenewAction(long timestamp, HftpFileSystem fs) {
+      this.timestamp = timestamp;
+      this.weakFs = new WeakReference<HftpFileSystem>(fs);
+    }
+ 
+    /**
+     * Get the delay until this event should happen.
+     */
+    @Override
+    public long getDelay(TimeUnit unit) {
+      long millisLeft = timestamp - System.currentTimeMillis();
+      return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Compare two events in the same queue.
+     */
+    @Override
+    public int compareTo(Delayed o) {
+      if (o.getClass() != RenewAction.class) {
+        throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
+      }
+      RenewAction other = (RenewAction) o;
+      return timestamp < other.timestamp ? -1 :
+        (timestamp == other.timestamp ? 0 : 1);
+    }
+    
+    /**
+     * Set a new time for the renewal. Can only be called when the action
+     * is not in the queue.
+     * @param newTime the new time
+     */
+    public void setNewTime(long newTime) {
+      timestamp = newTime;
+    }
+
+    /**
+     * Renew or replace the delegation token for this file system.
+     * @return
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    public boolean renew() throws IOException, InterruptedException {
+      final HftpFileSystem fs = weakFs.get();
+      if (fs != null) {
+        synchronized (fs) {
+          fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
+
+            @Override
+            public Void run() throws Exception {
+              try {
+                DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl, 
+                                                            fs.delegationToken);
+              } catch (IOException ie) {
+                try {
+                  fs.delegationToken = 
+                    (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
+                } catch (IOException ie2) {
+                  throw new IOException("Can't renew or get new delegation token ", 
+                                        ie);
+                }
+              }
+              return null;
+            } 
+          });
+        }
+      }
+      return fs != null;
+    }
+    
+    public String toString() {
+      StringBuilder result = new StringBuilder();
+      HftpFileSystem fs = weakFs.get();
+      if (fs == null) {
+        return "evaporated token renew";
+      }
+      synchronized (fs) {
+        result.append(fs.delegationToken);
+      }
+      result.append(" renew in ");
+      result.append(getDelay(TimeUnit.SECONDS));
+      result.append(" secs");
+      return result.toString();
+    }
+  }
+
+  /**
+   * A daemon thread that waits for the next file system to renew.
+   */
+  private static class RenewerThread extends Thread {
+    private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>();
+    // wait for 95% of a day between renewals
+    private final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000);
+
+    public RenewerThread() {
+      super("HFTP Delegation Token Renewer");
+      setDaemon(true);
+    }
+
+    public void addTokenToRenew(HftpFileSystem fs) {
+      queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
+    }
+
+    public void run() {
+      RenewAction action = null;
+        while (true) {
+          try {
+            action = queue.take();
+            if (action.renew()) {
+              action.setNewTime(RENEW_CYCLE + System.currentTimeMillis());
+              queue.add(action);
+            }
+            action = null;
+          } catch (InterruptedException ie) {
+            return;
+          } catch (Exception ie) {
+            if (action != null) {
+              LOG.warn("Failure to renew token " + action, ie);
+            } else {
+              LOG.warn("Failure in renew queue", ie);
+            }
+          }
+        }
+    }
+  }
+
+  private static RenewerThread renewer = new RenewerThread();
+  static {
+    renewer.start();
+  }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1077487&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java Fri Mar  4 04:19:59 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Cancel delegation tokens over http for use in hftp.
+ */
+@SuppressWarnings("serial")
+public class CancelDelegationTokenServlet extends DfsServlet {
+  private static final Log LOG = LogFactory.getLog(CancelDelegationTokenServlet.class);
+  public static final String PATH_SPEC = "/cancelDelegationToken";
+  public static final String TOKEN = "token";
+  
+  @Override
+  protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
+      throws ServletException, IOException {
+    final UserGroupInformation ugi;
+    final ServletContext context = getServletContext();
+    final Configuration conf = 
+      (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+    try {
+      ugi = getUGI(req, conf);
+    } catch(IOException ioe) {
+      LOG.info("Request for token received with no authentication from "
+          + req.getRemoteAddr(), ioe);
+      resp.sendError(HttpServletResponse.SC_FORBIDDEN, 
+          "Unable to identify or authenticate user");
+      return;
+    }
+    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    String tokenString = req.getParameter(TOKEN);
+    if (tokenString == null) {
+      resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
+                     "Token to renew not specified");
+    }
+    final Token<DelegationTokenIdentifier> token = 
+      new Token<DelegationTokenIdentifier>();
+    token.decodeFromUrlString(tokenString);
+    
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          nn.cancelDelegationToken(token);
+          return null;
+        }
+      });
+    } catch(Exception e) {
+      LOG.info("Exception while cancelling token. Re-throwing. ", e);
+      resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                     e.getMessage());
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1077487&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Fri Mar  4 04:19:59 2011
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Serve delegation tokens over http for use in hftp.
+ */
+@SuppressWarnings("serial")
+public class GetDelegationTokenServlet extends DfsServlet {
+  private static final Log LOG = LogFactory.getLog(GetDelegationTokenServlet.class);
+  public static final String PATH_SPEC = "/getDelegationToken";
+  public static final String RENEWER = "renewer";
+  
+  @Override
+  protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
+      throws ServletException, IOException {
+    final UserGroupInformation ugi;
+    final ServletContext context = getServletContext();
+    final Configuration conf = 
+      (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+    try {
+      ugi = getUGI(req, conf);
+    } catch(IOException ioe) {
+      LOG.info("Request for token received with no authentication from "
+          + req.getRemoteAddr(), ioe);
+      resp.sendError(HttpServletResponse.SC_FORBIDDEN, 
+          "Unable to identify or authenticate user");
+      return;
+    }
+    LOG.info("Sending token: {" + ugi.getUserName() + "," + req.getRemoteAddr() +"}");
+    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    String renewer = req.getParameter(RENEWER);
+    final String renewerFinal = (renewer == null) ? 
+        req.getUserPrincipal().getName() : renewer;
+    
+    DataOutputStream dos = null;
+    try {
+      dos = new DataOutputStream(resp.getOutputStream());
+      final DataOutputStream dosFinal = dos; // for doAs block
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          
+          Token<DelegationTokenIdentifier> token = 
+            nn.getDelegationToken(new Text(renewerFinal));
+          String s = NameNode.getAddress(conf).getAddress().getHostAddress()
+                     + ":" + NameNode.getAddress(conf).getPort();
+          token.setService(new Text(s));
+          Credentials ts = new Credentials();
+          ts.addToken(new Text(ugi.getShortUserName()), token);
+          ts.write(dosFinal);
+          dosFinal.close();
+          return null;
+        }
+      });
+
+    } catch(Exception e) {
+      LOG.info("Exception while sending token. Re-throwing. ", e);
+      resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    } finally {
+      if(dos != null) dos.close();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar  4 04:19:59 2011
@@ -291,7 +291,15 @@ public class NameNode implements ClientP
           httpServer.setAttribute("name.system.image", getFSImage());
           httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
           httpServer.addInternalServlet("getDelegationToken", 
-              DelegationTokenServlet.PATH_SPEC, DelegationTokenServlet.class, true);
+                                        GetDelegationTokenServlet.PATH_SPEC, 
+                                        GetDelegationTokenServlet.class, true);
+          httpServer.addInternalServlet("renewDelegationToken", 
+                                        RenewDelegationTokenServlet.PATH_SPEC, 
+                                        RenewDelegationTokenServlet.class, true);
+          httpServer.addInternalServlet("cancelDelegationToken", 
+                                        CancelDelegationTokenServlet.PATH_SPEC, 
+                                        CancelDelegationTokenServlet.class,
+                                        true);
           httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true);
           httpServer.addInternalServlet("getimage", "/getimage", 
               GetImageServlet.class, true);

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1077487&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Fri Mar  4 04:19:59 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Renew delegation tokens over http for use in hftp.
+ */
+@SuppressWarnings("serial")
+public class RenewDelegationTokenServlet extends DfsServlet {
+  private static final Log LOG = LogFactory.getLog(RenewDelegationTokenServlet.class);
+  public static final String PATH_SPEC = "/renewDelegationToken";
+  public static final String TOKEN = "token";
+  
+  @Override
+  protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
+      throws ServletException, IOException {
+    final UserGroupInformation ugi;
+    final ServletContext context = getServletContext();
+    final Configuration conf = 
+      (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+    try {
+      ugi = getUGI(req, conf);
+    } catch(IOException ioe) {
+      LOG.info("Request for token received with no authentication from "
+          + req.getRemoteAddr(), ioe);
+      resp.sendError(HttpServletResponse.SC_FORBIDDEN, 
+          "Unable to identify or authenticate user");
+      return;
+    }
+    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    String tokenString = req.getParameter(TOKEN);
+    if (tokenString == null) {
+      resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
+                     "Token to renew not specified");
+    }
+    final Token<DelegationTokenIdentifier> token = 
+      new Token<DelegationTokenIdentifier>();
+    token.decodeFromUrlString(tokenString);
+    
+    try {
+      long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
+        public Long run() throws Exception {
+          return nn.renewDelegationToken(token);
+        }
+      });
+      PrintStream os = new PrintStream(resp.getOutputStream());
+      os.println(result);
+      os.close();
+    } catch(Exception e) {
+      LOG.info("Exception while renewing token. Re-throwing. ", e);
+      resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                     e.getMessage());
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Fri Mar  4 04:19:59 2011
@@ -17,22 +17,30 @@
  */
 package org.apache.hadoop.hdfs.tools;
 
+import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
 import java.security.PrivilegedExceptionAction;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.DelegationTokenServlet;
+import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
+import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
+import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
@@ -54,6 +62,13 @@ public class DelegationTokenFetcher {
   private final UserGroupInformation ugi;
   private final DataOutputStream out;
   private final Configuration conf;
+  private static final Log LOG = 
+    LogFactory.getLog(DelegationTokenFetcher.class);
+  
+  static {
+    // Enable Kerberos sockets
+    System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
+  }
 
   /**
    * Command-line interface
@@ -136,36 +151,99 @@ public class DelegationTokenFetcher {
    * Utility method to obtain a delegation token over http
    * @param nnHttpAddr Namenode http addr, such as http://namenode:50070
    */
-  static public Credentials getDTfromRemote(String nnAddr, String renewer) 
-  throws IOException {
-    // Enable Kerberos sockets
-   System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
-   DataInputStream dis = null;
-   
-   try {
-     StringBuffer url = new StringBuffer();
-     if (renewer != null) {
-       url.append(nnAddr).append(DelegationTokenServlet.PATH_SPEC).append("?").
-       append(DelegationTokenServlet.RENEWER).append("=").append(renewer);
-     } else {
-       url.append(nnAddr).append(DelegationTokenServlet.PATH_SPEC);
-     }
-     System.out.println("Retrieving token from: " + url);
-     URL remoteURL = new URL(url.toString());
-     SecurityUtil.fetchServiceTicket(remoteURL);
-     URLConnection connection = remoteURL.openConnection();
-     
-     InputStream in = connection.getInputStream();
-     Credentials ts = new Credentials();
-     dis = new DataInputStream(in);
-     ts.readFields(dis);
-     return ts;
-   } catch (Exception e) {
-     throw new IOException("Unable to obtain remote token", e);
-   } finally {
-     if(dis != null) dis.close();
-   }
- }
+  static public Credentials getDTfromRemote(String nnAddr, 
+                                            String renewer) throws IOException {
+    DataInputStream dis = null;
+
+    try {
+      StringBuffer url = new StringBuffer();
+      if (renewer != null) {
+        url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC).append("?").
+        append(GetDelegationTokenServlet.RENEWER).append("=").append(renewer);
+      } else {
+        url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
+      }
+      System.out.println("Retrieving token from: " + url);
+      URL remoteURL = new URL(url.toString());
+      SecurityUtil.fetchServiceTicket(remoteURL);
+      URLConnection connection = remoteURL.openConnection();
+
+      InputStream in = connection.getInputStream();
+      Credentials ts = new Credentials();
+      dis = new DataInputStream(in);
+      ts.readFields(dis);
+      return ts;
+    } catch (Exception e) {
+      throw new IOException("Unable to obtain remote token", e);
+    } finally {
+      if(dis != null) dis.close();
+    }
+  }
+  
+  /**
+   * Renew a Delegation Token.
+   * @param nnAddr the NameNode's address
+   * @param tok the token to renew
+   * @return the Date that the token will expire next.
+   * @throws IOException
+   */
+  static public long renewDelegationToken(String nnAddr,
+                                          Token<DelegationTokenIdentifier> tok
+                                          ) throws IOException {
+    StringBuilder buf = new StringBuilder();
+    buf.append(nnAddr);
+    buf.append(RenewDelegationTokenServlet.PATH_SPEC);
+    buf.append("?");
+    buf.append(RenewDelegationTokenServlet.TOKEN);
+    buf.append("=");
+    buf.append(tok.encodeToUrlString());
+    BufferedReader in = null;
+    try {
+      URL url = new URL(buf.toString());
+      SecurityUtil.fetchServiceTicket(url);
+      URLConnection connection = url.openConnection();
+      in = new BufferedReader(new InputStreamReader
+                              (connection.getInputStream()));
+      long result = Long.parseLong(in.readLine());
+      in.close();
+      return result;
+    } catch (IOException ie) {
+      IOUtils.cleanup(LOG, in);
+      throw ie;
+    }
+  }
+
+  /**
+   * Cancel a Delegation Token.
+   * @param nnAddr the NameNode's address
+   * @param tok the token to cancel
+   * @throws IOException
+   */
+  static public void cancelDelegationToken(String nnAddr,
+                                           Token<DelegationTokenIdentifier> tok
+                                           ) throws IOException {
+    StringBuilder buf = new StringBuilder();
+    buf.append(nnAddr);
+    buf.append(CancelDelegationTokenServlet.PATH_SPEC);
+    buf.append("?");
+    buf.append(CancelDelegationTokenServlet.TOKEN);
+    buf.append("=");
+    buf.append(tok.encodeToUrlString());
+    BufferedReader in = null;
+    try {
+      URL url = new URL(buf.toString());
+      SecurityUtil.fetchServiceTicket(url);
+      HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+        throw new IOException("Error cancelling token:" + 
+                              connection.getResponseMessage());
+      }
+    } catch (IOException ie) {
+      IOUtils.cleanup(LOG, in);
+      throw ie;
+    }
+  }
+
   /**
    * Utility method to obtain a delegation token over http
    * @param nnHttpAddr Namenode http addr, such as http://namenode:50070

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 04:19:59 2011
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
@@ -761,13 +762,7 @@ public class JobClient extends Configure
         jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
         JobStatus status = null;
         try {
-          // load the binary file, if the user has one
-          String binaryTokenFilename = 
-            jobCopy.get("mapreduce.job.credentials.binary");
-          if (binaryTokenFilename != null) {
-            jobCopy.getCredentials().readTokenStorageFile
-               (new Path("file:///" +  binaryTokenFilename), jobCopy);
-          }
+          populateTokenCache(jobCopy, jobCopy.getCredentials());
 
           copyAndConfigureFiles(jobCopy, submitJobDir);
 
@@ -819,7 +814,7 @@ public class JobClient extends Configure
           //
           // Now, actually submit the job (using the submit name)
           //
-          populateTokenCache(jobCopy, jobCopy.getCredentials());
+          printTokens(jobId, jobCopy.getCredentials());
           status = jobSubmitClient.submitJob(
               jobId, submitJobDir.toString(), jobCopy.getCredentials());
           if (status != null) {
@@ -839,6 +834,20 @@ public class JobClient extends Configure
   }
 
   @SuppressWarnings("unchecked")
+  private void printTokens(JobID jobId,
+                           Credentials credentials) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Printing tokens for job: " + jobId);
+      for(Token<?> token: credentials.getAllTokens()) {
+        if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
+          LOG.debug("Submitting with " +
+              DFSClient.stringifyToken((Token<org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier>) token));
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
   private <T extends InputSplit>
   int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
       InterruptedException, ClassNotFoundException {
@@ -1894,11 +1903,19 @@ public class JobClient extends Configure
     System.exit(res);
   }
   
-  //get secret keys and tokens and store them into TokenCache
   @SuppressWarnings("unchecked")
-  private void populateTokenCache(Configuration conf, Credentials credentials) 
-  throws IOException{
-    // create TokenStorage object with user secretKeys
+  private void readTokensFromFiles(Configuration conf, Credentials credentials
+                                   ) throws IOException {
+    // add tokens and secrets coming from a token storage file
+    String binaryTokenFilename =
+      conf.get("mapreduce.job.credentials.binary");
+    if (binaryTokenFilename != null) {
+      Credentials binary = 
+        Credentials.readTokenStorageFile(new Path("file:///" +  
+                                                  binaryTokenFilename), conf);
+      credentials.addAll(binary);
+    }
+    // add secret keys coming from a json file
     String tokensFileName = conf.get("mapreduce.job.credentials.json");
     if(tokensFileName != null) {
       LOG.info("loading user's secret keys from " + tokensFileName);
@@ -1923,11 +1940,18 @@ public class JobClient extends Configure
       if(json_error)
         LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
     }
-    
+  }
+
+  //get secret keys and tokens and store them into TokenCache
+  @SuppressWarnings("unchecked")
+  private void populateTokenCache(Configuration conf, Credentials credentials) 
+  throws IOException{
+    readTokensFromFiles(conf, credentials);
  
     // add the delegation tokens from configuration
     String [] nameNodes = conf.getStrings(JobContext.JOB_NAMENODES);
-    LOG.debug("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+    LOG.debug("adding the following namenodes' delegation tokens:" + 
+              Arrays.toString(nameNodes));
     if(nameNodes != null) {
       Path [] ps = new Path[nameNodes.length];
       for(int i=0; i< nameNodes.length; i++) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar  4 04:19:59 2011
@@ -80,80 +80,45 @@ public class TokenCache {
   }
 
   static void obtainTokensForNamenodesInternal(Credentials credentials,
-                                               Path [] ps, Configuration conf)
-  throws IOException {
+                                               Path [] ps, 
+                                               Configuration conf
+                                               ) throws IOException {
     // get jobtracker principal id (for the renewer)
     KerberosName jtKrbName = new KerberosName(conf.get(JobTracker.JT_USER_NAME, ""));
-    Text delegTokenRenewer = new Text(jtKrbName.getShortName());
-    boolean notReadFile = true;
+    String delegTokenRenewer = jtKrbName.getShortName();
+    boolean readFile = true;
     for(Path p: ps) {
-      //TODO: Connecting to the namenode is not required in the case,
-      //where we already have the credentials in the file
       FileSystem fs = FileSystem.get(p.toUri(), conf);
-      if(fs instanceof DistributedFileSystem) {
-        DistributedFileSystem dfs = (DistributedFileSystem)fs;
-        URI uri = fs.getUri();
-        String fs_addr = 
-            SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
-
-        // see if we already have the token
-        Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(credentials, fs_addr); 
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
-          continue;
-        }
-        if (notReadFile) { //read the file only once
+      String fsName = fs.getCanonicalServiceName();
+      if (TokenCache.getDelegationToken(credentials, fsName) == null) {
+        //TODO: Need to come up with a better place to put
+        //this block of code to do with reading the file
+        if (readFile) {
+          readFile = false;
           String binaryTokenFilename =
             conf.get("mapreduce.job.credentials.binary");
           if (binaryTokenFilename != null) {
-            credentials.readTokenStorageFile(new Path("file:///" +  
-                binaryTokenFilename), conf);
+            Credentials binary;
+            try {
+              binary = Credentials.readTokenStorageFile(new Path("file:///" +  
+                  binaryTokenFilename), conf);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+            credentials.addAll(binary);
           }
-          notReadFile = false;
-          token = 
-            TokenCache.getDelegationToken(credentials, fs_addr); 
-          if(token != null) {
-            LOG.debug("DT for " + token.getService()  + " is already present");
+          if (TokenCache.getDelegationToken(credentials, fsName) != null) {
+            LOG.debug("DT for " + fsName  + " is already present");
             continue;
           }
         }
-        // get the token
-        token = dfs.getDelegationToken(delegTokenRenewer);
-        if(token==null) {
-          LOG.warn("Token from " + fs_addr + " is null");
-          continue;
-        }
-
-        token.setService(new Text(fs_addr));
-        credentials.addToken(new Text(fs_addr), token);
-        LOG.info("Got dt for " + p + ";uri="+ fs_addr + 
-            ";t.service="+token.getService());
-      } else if (fs instanceof HftpFileSystem) {
-        String fs_addr = 
-          SecurityUtil.buildDTServiceName(fs.getUri(), NameNode.DEFAULT_PORT);
-        Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(credentials, fs_addr); 
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
-          continue;
-        }
-        //the initialize method of hftp, called via FileSystem.get() done
-        //earlier gets a delegation token
-        Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken(); 
-        if (t != null) {
-          credentials.addToken(new Text(fs_addr), t);
-        
-          // in this case port in fs_addr is port for hftp request, but
-          // token's port is for RPC
-          // to find the correct DT we need to know the mapping between Hftp port 
-          // and RPC one. hence this new setting in the conf.
-          URI uri = ((HftpFileSystem) fs).getUri();
-          String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
-             SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
-          conf.set(key, t.getService().toString());
-          LOG.info("GOT dt for " + p + " and stored in conf as " + key + "=" 
-              + t.getService());
+        Token<?> token = fs.getDelegationToken(delegTokenRenewer);
+        if (token != null) {
+          Text fsNameText = new Text(fsName);
+          token.setService(fsNameText);
+          credentials.addToken(fsNameText, token);
+          LOG.info("Got dt for " + p + ";uri="+ fsName + 
+                   ";t.service="+token.getService());
         }
       }
     }
@@ -195,12 +160,14 @@ public class TokenCache {
   throws IOException {
     Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
     
-    Credentials ts = new Credentials();
-    ts.readTokenStorageFile(localJobTokenFile, conf);
+    Credentials ts = 
+      Credentials.readTokenStorageFile(localJobTokenFile, conf);
 
     if(LOG.isDebugEnabled()) {
-      LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
-        +"; num of sec keys  = " + ts.numberOfSecretKeys() + " Number of tokens " + 
+      LOG.debug("Task: Loaded jobTokenFile from: "+
+          localJobTokenFile.toUri().getPath() 
+        +"; num of sec keys  = " + ts.numberOfSecretKeys() +
+        " Number of tokens " + 
         ts.numberOfTokens());
     }
     return ts;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java Fri Mar  4 04:19:59 2011
@@ -627,10 +627,6 @@ public class DistCp implements Tool {
   throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
     
-    // get tokens for all the required FileSystems..
-    // also set the renewer as the JobTracker for the hftp case
-    jobConf.set(HftpFileSystem.HFTP_RENEWER, 
-        jobConf.get(JobTracker.JT_USER_NAME, ""));
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);