You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2012/10/01 23:36:38 UTC

svn commit: r1392637 - in /hadoop/common/branches/branch-1: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/tools/ src/test/org/apache/hadoop/hdfs/

Author: omalley
Date: Mon Oct  1 21:36:38 2012
New Revision: 1392637

URL: http://svn.apache.org/viewvc?rev=1392637&view=rev
Log:
HDFS-3461. Make HFTP and HSFTP use http and https respectively for 
getting, renewing, and cancelling the underlying token. Systems with
weak crypto (kssl) configured will continue to use https. (omalley)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/build.xml
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HsftpFileSystem.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestHftpFileSystem.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1392637&r1=1392636&r2=1392637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Oct  1 21:36:38 2012
@@ -610,6 +610,10 @@ Release 1.1.0 - unreleased
 
     MAPREDUCE-4558. Disable TestJobTrackerSafeMode (sseth)
 
+    HDFS-3461. Make HFTP and HSFTP use http and https respectively for 
+    getting, renewing, and cancelling the underlying token. Systems with
+    weak crypto (kssl) configured will continue to use https. (omalley)
+
 Release 1.0.4 - Unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-1/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/build.xml?rev=1392637&r1=1392636&r2=1392637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/build.xml (original)
+++ hadoop/common/branches/branch-1/build.xml Mon Oct  1 21:36:38 2012
@@ -765,6 +765,7 @@
         <provider classname="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer"/>
         <provider classname="org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier$Renewer"/>
         <provider classname="org.apache.hadoop.hdfs.HftpFileSystem$TokenManager"/>
+        <provider classname="org.apache.hadoop.hdfs.HsftpFileSystem$TokenManager"/>
         <provider classname="org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer"/>
       </service>
       <fileset dir="${conf.dir}" includes="${jar.properties.list}" />

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1392637&r1=1392636&r2=1392637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java Mon Oct  1 21:36:38 2012
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
 import org.apache.hadoop.io.Text;
@@ -91,12 +90,12 @@ public class HftpFileSystem extends File
   private URI hftpURI;
 
   protected InetSocketAddress nnAddr;
-  protected InetSocketAddress nnSecureAddr;  
 
   public static final String HFTP_TIMEZONE = "UTC";
   public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
 
   private Token<?> delegationToken;
+  private boolean createdToken = false;
   private Token<?> renewToken;
   private static final HftpDelegationTokenSelector hftpTokenSelector =
       new HftpDelegationTokenSelector();
@@ -120,27 +119,14 @@ public class HftpFileSystem extends File
         DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
   }
 
-  protected int getDefaultSecurePort() {
-    return !SecurityUtil.useKsslAuth() ? getDefaultPort() :
-        getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
-            DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
-  }
-
   protected InetSocketAddress getNamenodeAddr(URI uri) {
     // use authority so user supplied uri can override port
     return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
   }
 
-  protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
-    // must only use the host and the configured https port
-    return NetUtils.makeSocketAddr(uri.getHost(), getDefaultSecurePort());
-  }
-
   @Override
   public String getCanonicalServiceName() {
-    // unlike other filesystems, hftp's service is the secure port, not the
-    // actual port in the uri
-    return SecurityUtil.buildTokenService(nnSecureAddr).toString();
+    return SecurityUtil.buildTokenService(nnAddr).toString();
   }
 
   @Override
@@ -150,7 +136,6 @@ public class HftpFileSystem extends File
     super.initialize(name, conf);
     this.ugi = UserGroupInformation.getCurrentUser();
     this.nnAddr = getNamenodeAddr(name);
-    this.nnSecureAddr = getNamenodeSecureAddr(name);
     this.hftpURI = DFSUtil.createUri(name.getScheme(), nnAddr);
   }
   
@@ -162,7 +147,6 @@ public class HftpFileSystem extends File
     }   
 
     //since we don't already have a token, go get one over https
-    boolean createdToken = false;
     if (token == null) {
       token = getDelegationToken(null);
       createdToken = (token != null);
@@ -181,7 +165,7 @@ public class HftpFileSystem extends File
   }
 
   protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
-    Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
+    Text serviceName = SecurityUtil.buildTokenService(nnAddr);
     return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());      
   }
   
@@ -190,12 +174,31 @@ public class HftpFileSystem extends File
         nnAddr, ugi, getConf());
   }
   
+  @Override
+  public void close() throws IOException {
+    // if we created a token, we should cancel it
+    if (createdToken) {
+      try {
+        renewToken.cancel(getConf());
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      }
+    }
+    super.close();
+  }
 
   @Override
   public Token<?> getRenewToken() {
     return renewToken;
   }
 
+  /**
+   * Return the underlying protocol that is used to talk to the namenode.
+   */
+  protected String getUnderlyingProtocol() {
+    return "http";
+  }
+
   @Override
   public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
     renewToken = token;
@@ -213,35 +216,31 @@ public class HftpFileSystem extends File
   @Override
   public synchronized Token<?> getDelegationToken(final String renewer
                                                   ) throws IOException {
+    //Renew TGT if needed
+    ugi.checkTGTAndReloginFromKeytab();
+    Credentials c;
     try {
-      //Renew TGT if needed
-      ugi.checkTGTAndReloginFromKeytab();
-      return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
-        public Token<?> run() throws IOException {
-          final String nnHttpUrl = DFSUtil.createUri(
-              NameNode.getHttpUriScheme(), nnSecureAddr).toString();
-          Credentials c;
-          try {
-            c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
-          } catch (Exception e) {
-            LOG.info("Couldn't get a delegation token from " + nnHttpUrl + 
-            " using " + NameNode.getHttpUriScheme());
-            LOG.debug("error was ", e);
-            //Maybe the server is in unsecure mode (that's bad but okay)
-            remoteIsInsecure = true;
-            return null;
-          }
-          for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
-            LOG.debug("Got dt for " + getUri() + ";t.service="
-                      +t.getService());
-            return t;
-          }
-          return null;
+      c = ugi.doAs(new PrivilegedExceptionAction<Credentials>(){
+        public Credentials run() throws Exception {
+          return
+            DelegationTokenFetcher.getDTfromRemote(getUnderlyingProtocol(),
+                                                   nnAddr,
+                                                   renewer,
+                                                   getConf());
         }
       });
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+    } catch (Exception e) {
+      LOG.info("Couldn't get a delegation token from " + nnAddr);
+      LOG.debug("error was ", e);
+      //Maybe the server is in unsecure mode (that's bad but okay)
+      remoteIsInsecure = true;
+      return null;
     }
+    for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
+      LOG.debug("Got dt for " + getUri() + ";t.service=" + t.getService());
+      return t;
+    }
+    return null;
   }
 
   @Override
@@ -281,8 +280,9 @@ public class HftpFileSystem extends File
       throws IOException {
     try {
       query = updateQuery(query);
-      final URL url = new URI("http", null, nnAddr.getHostName(),
-          nnAddr.getPort(), path, query, null).toURL();
+      final URL url = new URI(getUnderlyingProtocol(), null, 
+			      nnAddr.getHostName(),
+			      nnAddr.getPort(), path, query, null).toURL();
       if (LOG.isTraceEnabled()) {
         LOG.trace("url=" + url);
       }
@@ -653,7 +653,7 @@ public class HftpFileSystem extends File
     final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
     return cs != null? cs: super.getContentSummary(f);
   }
-  
+
   @InterfaceAudience.Private
   public static class TokenManager extends TokenRenewer {
 
@@ -667,17 +667,22 @@ public class HftpFileSystem extends File
       return true;
     }
 
+    protected String getUnderlyingProtocol() {
+      return "http";
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public long renew(Token<?> token, 
                       Configuration conf) throws IOException {
       // update the kerberos credentials, if they are coming from a keytab
       UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-      // use http/s to renew the token
       InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
-      return DelegationTokenFetcher.renewDelegationToken(
-          DFSUtil.createUri(NameNode.getHttpUriScheme(), serviceAddr).toString(),
-          (Token<DelegationTokenIdentifier>) token);
+      return DelegationTokenFetcher.
+        renewDelegationToken(getUnderlyingProtocol(),
+                             serviceAddr,
+                             (Token<DelegationTokenIdentifier>) token,
+                             conf);
     }
 
     @SuppressWarnings("unchecked")
@@ -686,11 +691,12 @@ public class HftpFileSystem extends File
                        Configuration conf) throws IOException {
       // update the kerberos credentials, if they are coming from a keytab
       UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-      // use http/s to cancel the token
       InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
-      DelegationTokenFetcher.cancelDelegationToken(
-          DFSUtil.createUri(NameNode.getHttpUriScheme(), serviceAddr).toString(),
-          (Token<DelegationTokenIdentifier>) token);
+      DelegationTokenFetcher.
+        cancelDelegationToken(getUnderlyingProtocol(),
+                              serviceAddr,
+                              (Token<DelegationTokenIdentifier>) token,
+                              conf);
     }
     
   }

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=1392637&r1=1392636&r2=1392637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HsftpFileSystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/HsftpFileSystem.java Mon Oct  1 21:36:38 2012
@@ -20,15 +20,23 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSession;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 
 /** An implementation of a protocol for accessing filesystems over HTTPS.
  * The following implementation provides a limited, read-only interface
@@ -37,33 +45,19 @@ import org.apache.hadoop.conf.Configurat
  * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
  */
 public class HsftpFileSystem extends HftpFileSystem {
+  public static final Text TOKEN_KIND = new Text("HSFTP delegation");
+
+  private static final DelegationTokenRenewer<HsftpFileSystem> dtRenewer
+      = new DelegationTokenRenewer<HsftpFileSystem>(HsftpFileSystem.class);
+  private static final HsftpDelegationTokenSelector hftpTokenSelector =
+      new HsftpDelegationTokenSelector();
 
   @Override
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
-    setupSsl(conf);
+    DelegationTokenFetcher.setupSsl(conf);
   }
 
-  /** Set up SSL resources */
-  private static void setupSsl(Configuration conf) {
-    Configuration sslConf = new Configuration(false);
-    sslConf.addResource(conf.get("dfs.https.client.keystore.resource",
-        "ssl-client.xml"));
-    System.setProperty("javax.net.ssl.trustStore", sslConf.get(
-        "ssl.client.truststore.location", ""));
-    System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
-        "ssl.client.truststore.password", ""));
-    System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
-        "ssl.client.truststore.type", "jks"));
-    System.setProperty("javax.net.ssl.keyStore", sslConf.get(
-        "ssl.client.keystore.location", ""));
-    System.setProperty("javax.net.ssl.keyStorePassword", sslConf.get(
-        "ssl.client.keystore.password", ""));
-    System.setProperty("javax.net.ssl.keyPassword", sslConf.get(
-        "ssl.client.keystore.keypassword", ""));
-    System.setProperty("javax.net.ssl.keyStoreType", sslConf.get(
-        "ssl.client.keystore.type", "jks"));
-  }
 
   @Override
   protected int getDefaultPort() {
@@ -71,9 +65,11 @@ public class HsftpFileSystem extends Hft
         DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
   }
 
-  @Override
-  protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
-    return getNamenodeAddr(uri);
+  /**
+   * Return the underlying protocol that is used to talk to the namenode.
+   */
+  protected String getUnderlyingProtocol() {
+    return "https";
   }
 
   @Override
@@ -81,8 +77,9 @@ public class HsftpFileSystem extends Hft
       throws IOException {
     try {
       query = updateQuery(query);
-      final URL url = new URI("https", null, nnAddr.getHostName(),
-          nnAddr.getPort(), path, query, null).toURL();
+      final URL url = new URI(getUnderlyingProtocol(), null, 
+			      nnAddr.getHostName(),
+			      nnAddr.getPort(), path, query, null).toURL();
       HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
       // bypass hostname verification
       conn.setHostnameVerifier(new DummyHostnameVerifier());
@@ -101,4 +98,30 @@ public class HsftpFileSystem extends Hft
     }
   }
 
+  protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
+    Text serviceName = SecurityUtil.buildTokenService(nnAddr);
+    return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());      
+  }
+  
+  @InterfaceAudience.Private
+  public static class TokenManager extends HftpFileSystem.TokenManager {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return kind.equals(TOKEN_KIND);
+    }
+
+    protected String getUnderlyingProtocol() {
+      return "https";
+    }
+  }
+  
+  private static class HsftpDelegationTokenSelector
+  extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
+
+    public HsftpDelegationTokenSelector() {
+      super(TOKEN_KIND);
+    }
+  }
+
 }

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1392637&r1=1392636&r2=1392637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Mon Oct  1 21:36:38 2012
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
 import java.security.PrivilegedExceptionAction;
@@ -37,7 +38,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HftpFileSystem;
+import org.apache.hadoop.hdfs.HsftpFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -67,10 +72,10 @@ public class DelegationTokenFetcher {
   private static final String WEBSERVICE = "webservice";
   private static final String CANCEL = "cancel";
   private static final String RENEW = "renew";
-  
+
   static {
-    // Enable Kerberos sockets
-    System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
+    // reference a field to make sure the static blocks run
+    int x = Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.size();
   }
 
   private static void printUsage(PrintStream err) throws IOException {
@@ -97,6 +102,7 @@ public class DelegationTokenFetcher {
    */
   public static void main(final String [] args) throws Exception {
     final Configuration conf = new Configuration();
+    setupSsl(conf);
     Options fetcherOptions = new Options();
     fetcherOptions.addOption(WEBSERVICE, true, 
                              "HTTP/S url to reach the NameNode at");
@@ -126,135 +132,199 @@ public class DelegationTokenFetcher {
     FileSystem local = FileSystem.getLocal(conf);
     final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
 
-    // Login the current user
-    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-      @Override
-      public Object run() throws Exception {
-        
-        if (cancel) {
-          for(Token<?> token: readTokens(tokenFile, conf)) {
-            if (token.isManaged()) {
-              token.cancel(conf);
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("Cancelled token for " + token.getService());
-              }
-            }
-          }          
-        } else if (renew) {
-          for(Token<?> token: readTokens(tokenFile, conf)) {
-            if (token.isManaged()) {
-              token.renew(conf);
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("Renewed token for " + token.getService());
-              }
-            }
-          }          
-        } else {
-          if (webUrl != null) {
-            getDTfromRemote(webUrl, null).
-              writeTokenStorageFile(tokenFile, conf);
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Fetched token via http for " + webUrl);
-            }
-          } else {
-            FileSystem fs = FileSystem.get(conf);
-            Token<?> token = fs.getDelegationToken(ugi.getShortUserName());
-            Credentials cred = new Credentials();
-            cred.addToken(token.getService(), token);
-            cred.writeTokenStorageFile(tokenFile, conf);
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Fetched token for " + fs.getUri() + " into " +
-                               tokenFile);
-            }
-          }        
+    if (cancel) {
+      for(Token<?> token: readTokens(tokenFile, conf)) {
+        if (token.isManaged()) {
+          token.cancel(conf);
         }
-        return null;
-      }
-    });
-
+      }          
+    } else if (renew) {
+      for(Token<?> token: readTokens(tokenFile, conf)) {
+        if (token.isManaged()) {
+          token.renew(conf);
+        }
+      }          
+    } else {
+      if (webUrl != null) {
+        URI uri = new URI(webUrl);
+        getDTfromRemote(uri.getScheme(), 
+                        new InetSocketAddress(uri.getHost(), uri.getPort()),
+                        null,
+                        conf).
+          writeTokenStorageFile(tokenFile, conf);
+      } else {
+        FileSystem fs = FileSystem.get(conf);
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        Token<?> token = fs.getDelegationToken(ugi.getShortUserName());
+        Credentials cred = new Credentials();
+        cred.addToken(token.getService(), token);
+        cred.writeTokenStorageFile(tokenFile, conf);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Fetched token for " + fs.getUri() + " into " +
+                    tokenFile);
+        }
+      }        
+    }
   }
   
+  /** Set up SSL resources */
+  public static void setupSsl(Configuration conf) {
+    Configuration sslConf = new Configuration(false);
+    sslConf.addResource(conf.get("dfs.https.client.keystore.resource",
+        "ssl-client.xml"));
+    System.setProperty("javax.net.ssl.trustStore", sslConf.get(
+        "ssl.client.truststore.location", ""));
+    System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
+        "ssl.client.truststore.password", ""));
+    System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
+        "ssl.client.truststore.type", "jks"));
+    System.setProperty("javax.net.ssl.keyStore", sslConf.get(
+        "ssl.client.keystore.location", ""));
+    System.setProperty("javax.net.ssl.keyStorePassword", sslConf.get(
+        "ssl.client.keystore.password", ""));
+    System.setProperty("javax.net.ssl.keyPassword", sslConf.get(
+        "ssl.client.keystore.keypassword", ""));
+    System.setProperty("javax.net.ssl.keyStoreType", sslConf.get(
+        "ssl.client.keystore.type", "jks"));
+  }
+
   /**
    * Utility method to obtain a delegation token over http
-   * @param nnAddr Namenode http addr, such as http://namenode:50070
+   * @param protocol whether to use http or https
+   * @param nnAddr the address for the NameNode
    * @param renewer User that is renewing the ticket in such a request
+   * @param conf the configuration
    */
-  static public Credentials getDTfromRemote(String nnAddr, 
-      String renewer) throws IOException {
-    DataInputStream dis = null;
-    InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr);
+  static public Credentials getDTfromRemote(String protocol,
+                                            final InetSocketAddress nnAddr,
+                                            String renewer,
+                                            Configuration conf
+                                            ) throws IOException {
+    final String renewAddress = getRenewAddress(protocol, nnAddr, conf);
+    final boolean https = "https".equals(protocol);
 
     try {
-      StringBuffer url = new StringBuffer();
+      StringBuffer url = new StringBuffer(renewAddress);
+      url.append(GetDelegationTokenServlet.PATH_SPEC);
       if (renewer != null) {
-        url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC).append("?").
-        append(GetDelegationTokenServlet.RENEWER).append("=").append(renewer);
-      } else {
-        url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
+        url.append("?").
+          append(GetDelegationTokenServlet.RENEWER).append("=").
+          append(renewer);
       }
       if(LOG.isDebugEnabled()) {
         LOG.debug("Retrieving token from: " + url);
       }
-      URL remoteURL = new URL(url.toString());
-      URLConnection connection = SecurityUtil.openSecureHttpConnection(remoteURL);
-
-      InputStream in = connection.getInputStream();
-      Credentials ts = new Credentials();
-      dis = new DataInputStream(in);
-      ts.readFields(dis);
-      for(Token<?> token: ts.getAllTokens()) {
-        token.setKind(HftpFileSystem.TOKEN_KIND);
-        SecurityUtil.setTokenService(token, serviceAddr);
-      }
-      return ts;
-    } catch (Exception e) {
-      throw new IOException("Unable to obtain remote token", e);
-    } finally {
-      if(dis != null) dis.close();
+      final URL remoteURL = new URL(url.toString());
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      return ugi.doAs(new PrivilegedExceptionAction<Credentials>(){
+        public Credentials run() throws Exception {
+          URLConnection connection = 
+            SecurityUtil.openSecureHttpConnection(remoteURL);
+
+          InputStream in = connection.getInputStream();
+          Credentials ts = new Credentials();
+          DataInputStream dis = new DataInputStream(in);
+          try {
+            ts.readFields(dis);
+            for(Token<?> token: ts.getAllTokens()) {
+              if (https) {
+                token.setKind(HsftpFileSystem.TOKEN_KIND);
+              } else {
+                token.setKind(HftpFileSystem.TOKEN_KIND);
+              }
+              SecurityUtil.setTokenService(token, nnAddr);
+            }
+            dis.close();
+          } catch (IOException ie) {
+            IOUtils.cleanup(LOG, dis);
+          }
+          return ts;
+        }
+      });
+    } catch (InterruptedException ie) {
+      return null;
     }
   }
   
   /**
+   * Get the URI that we use for getting, renewing, and cancelling the
+   * delegation token. For KSSL with hftp that means we need to use
+   * https and the NN's https port.
+   */
+  protected static String getRenewAddress(String protocol,
+                                          InetSocketAddress addr,
+                                          Configuration conf) {
+    if (SecurityUtil.useKsslAuth() && "http".equals(protocol)) {
+      protocol = "https";
+      int port = 
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+                    DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
+      addr = new InetSocketAddress(addr.getAddress(), port);
+    }
+    return DFSUtil.createUri(protocol, addr).toString();
+  }
+
+  /**
    * Renew a Delegation Token.
-   * @param nnAddr the NameNode's address
+   * @param protocol The protocol to renew over (http or https)
+   * @param addr the address of the NameNode
    * @param tok the token to renew
+   * @param conf the configuration
    * @return the Date that the token will expire next.
    * @throws IOException
    */
-  static public long renewDelegationToken(String nnAddr,
-      Token<DelegationTokenIdentifier> tok) throws IOException {
-    StringBuilder buf = new StringBuilder();
-    buf.append(nnAddr);
+  static public long renewDelegationToken(String protocol,
+                                          InetSocketAddress addr,
+                                          Token<DelegationTokenIdentifier> tok,
+                                          Configuration conf
+                                          ) throws IOException {
+    final String renewAddress = getRenewAddress(protocol, addr, conf);
+    final StringBuilder buf = new StringBuilder(renewAddress);
+    final String service = tok.getService().toString();
     buf.append(RenewDelegationTokenServlet.PATH_SPEC);
     buf.append("?");
     buf.append(RenewDelegationTokenServlet.TOKEN);
     buf.append("=");
     buf.append(tok.encodeToUrlString());
-    BufferedReader in = null;
-    HttpURLConnection connection = null;
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     try {
-      URL url = new URL(buf.toString());
-      connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
-      in = new BufferedReader(new InputStreamReader
-                              (connection.getInputStream()));
-      long result = Long.parseLong(in.readLine());
-      in.close();
-      return result;
-    } catch (IOException ie) {
-      LOG.info("error in renew over HTTP", ie);
-      IOException e = null;
-      if(connection != null) {
-        String resp = connection.getResponseMessage();
-        e = getExceptionFromResponse(resp);
-      }
+      return ugi.doAs(new PrivilegedExceptionAction<Long>(){
+        public Long run() throws Exception {
+          BufferedReader in = null;
+          HttpURLConnection connection = null;
+          try {
+            URL url = new URL(buf.toString());
+            connection = 
+              (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
+            in = new BufferedReader(new InputStreamReader
+                                    (connection.getInputStream()));
+            long result = Long.parseLong(in.readLine());
+            in.close();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Renewed token for " + service + " via " + 
+                        renewAddress);
+            }
+            return result;
+          } catch (IOException ie) {
+            LOG.info("Error renewing token for " + renewAddress, ie);
+            IOException e = null;
+            if(connection != null) {
+              String resp = connection.getResponseMessage();
+              e = getExceptionFromResponse(resp);
+            }
       
-      IOUtils.cleanup(LOG, in);
-      if(e!=null) {
-        LOG.info("rethrowing exception from HTTP request: " + e.getLocalizedMessage());
-        throw e;
-      }
-      throw ie;
+            IOUtils.cleanup(LOG, in);
+            if (e!=null) {
+              LOG.info("rethrowing exception from HTTP request: " + 
+                       e.getLocalizedMessage());
+              throw e;
+            }
+            throw ie;
+          }
+        }
+        });
+    } catch (InterruptedException ie) {
+      return 0;
     }
   }
 
@@ -287,10 +357,13 @@ public class DelegationTokenFetcher {
    * @param tok the token to cancel
    * @throws IOException
    */
-  static public void cancelDelegationToken(String nnAddr,
-    Token<DelegationTokenIdentifier> tok) throws IOException {
-    StringBuilder buf = new StringBuilder();
-    buf.append(nnAddr);
+  static public void cancelDelegationToken(String protocol,
+                                           InetSocketAddress addr,
+                                           Token<DelegationTokenIdentifier> tok,
+                                           Configuration conf
+                                           ) throws IOException {
+    final String renewAddress = getRenewAddress(protocol, addr, conf);
+    StringBuilder buf = new StringBuilder(renewAddress);
     buf.append(CancelDelegationTokenServlet.PATH_SPEC);
     buf.append("?");
     buf.append(CancelDelegationTokenServlet.TOKEN);
@@ -298,16 +371,33 @@ public class DelegationTokenFetcher {
     buf.append(tok.encodeToUrlString());
     BufferedReader in = null;
     try {
-      URL url = new URL(buf.toString());
-      HttpURLConnection connection =
+      final URL url = new URL(buf.toString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("cancelling token at " + buf.toString());
+      }
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      ugi.doAs(new PrivilegedExceptionAction<Void>(){
+          public Void run() throws Exception {
+            HttpURLConnection connection =
               (HttpURLConnection)SecurityUtil.openSecureHttpConnection(url);
-      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        throw new IOException("Error cancelling token:" + 
-                              connection.getResponseMessage());
+            if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+              throw new IOException("Error cancelling token for " + 
+                                    renewAddress + " response: " +
+                                    connection.getResponseMessage());
+            }
+            return null;
+          }
+        });
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cancelled token for " + tok.getService() + " via " + 
+                  renewAddress);
       }
     } catch (IOException ie) {
+      LOG.warn("Error cancelling token for " + renewAddress, ie);
       IOUtils.cleanup(LOG, in);
       throw ie;
+    } catch (InterruptedException ie) {
+      // PASS
     }
   }
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestHftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestHftpFileSystem.java?rev=1392637&r1=1392636&r2=1392637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestHftpFileSystem.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestHftpFileSystem.java Mon Oct  1 21:36:38 2012
@@ -52,8 +52,8 @@ public class TestHftpFileSystem {
     URI uri = URI.create("hftp://localhost");
     HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
 
-    assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
-    assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultSecurePort());
+    assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
+        fs.getDefaultPort());
 
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
@@ -75,7 +75,6 @@ public class TestHftpFileSystem {
     HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(123, fs.getDefaultPort());
-    assertEquals(123, fs.getDefaultSecurePort());
     
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
@@ -94,15 +93,13 @@ public class TestHftpFileSystem {
     HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
-    assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultSecurePort());
 
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
     assertEquals(uri.getPort(), fsUri.getPort());
     
-    assertEquals(
-        "127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
-        fs.getCanonicalServiceName()
+    assertEquals("127.0.0.1:123",
+                 fs.getCanonicalServiceName()
     );
   }
 
@@ -116,15 +113,13 @@ public class TestHftpFileSystem {
     HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(123, fs.getDefaultPort());
-    assertEquals(123, fs.getDefaultSecurePort());
     
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
     assertEquals(789, fsUri.getPort());
     
-    assertEquals(
-        "127.0.0.1:123",
-        fs.getCanonicalServiceName()
+    assertEquals("127.0.0.1:789",
+                 fs.getCanonicalServiceName()
     );
   }
 
@@ -137,7 +132,6 @@ public class TestHftpFileSystem {
     HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
-    assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
 
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
@@ -159,7 +153,6 @@ public class TestHftpFileSystem {
     HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(456, fs.getDefaultPort());
-    assertEquals(456, fs.getDefaultSecurePort());
     
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
@@ -178,7 +171,6 @@ public class TestHftpFileSystem {
     HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
-    assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
 
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());
@@ -200,7 +192,6 @@ public class TestHftpFileSystem {
     HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
 
     assertEquals(456, fs.getDefaultPort());
-    assertEquals(456, fs.getDefaultSecurePort());
     
     URI fsUri = fs.getUri();
     assertEquals(uri.getHost(), fsUri.getHost());