You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2018/10/12 16:35:54 UTC

[2/2] hadoop git commit: HADOOP-14445. Use DelegationTokenIssuer to create KMS delegation tokens that can authenticate to all KMS instances. Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah.

HADOOP-14445. Use DelegationTokenIssuer to create KMS delegation tokens that can authenticate to all KMS instances.
Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ec86b44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ec86b44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ec86b44

Branch: refs/heads/trunk
Commit: 5ec86b445cc492f52c33639efb6a09a0d2f27475
Parents: 6e0e6da
Author: Xiao Chen <xi...@apache.org>
Authored: Fri Oct 12 09:32:21 2018 -0700
Committer: Xiao Chen <xi...@apache.org>
Committed: Fri Oct 12 09:35:52 2018 -0700

----------------------------------------------------------------------
 .../KeyProviderDelegationTokenExtension.java    |  71 ++--
 .../crypto/key/KeyProviderTokenIssuer.java      |   4 +-
 .../crypto/key/kms/KMSClientProvider.java       | 220 ++++++++----
 .../key/kms/LoadBalancingKMSClientProvider.java |  75 +++-
 .../java/org/apache/hadoop/fs/FileSystem.java   |  75 +---
 .../web/DelegationTokenAuthenticatedURL.java    |  25 +-
 .../security/token/DelegationTokenIssuer.java   | 112 ++++++
 .../java/org/apache/hadoop/util/KMSUtil.java    |  13 +-
 ...TestKeyProviderDelegationTokenExtension.java |  20 +-
 .../crypto/key/kms/TestKMSClientProvider.java   | 138 ++++++++
 .../kms/TestLoadBalancingKMSClientProvider.java |  63 +++-
 .../apache/hadoop/fs/TestFilterFileSystem.java  |   3 +
 .../org/apache/hadoop/fs/TestHarFileSystem.java |   3 +
 .../hadoop/crypto/key/kms/server/TestKMS.java   | 349 ++++++++++++++++---
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  11 +-
 .../hadoop/hdfs/DistributedFileSystem.java      |  14 +-
 .../org/apache/hadoop/hdfs/HdfsKMSUtil.java     |  60 ++--
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      |  20 +-
 18 files changed, 963 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
index a63b7d5..29c5bcd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.crypto.key;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
 
@@ -28,7 +32,8 @@ import java.io.IOException;
  */
 public class KeyProviderDelegationTokenExtension extends
     KeyProviderExtension
-    <KeyProviderDelegationTokenExtension.DelegationTokenExtension> {
+    <KeyProviderDelegationTokenExtension.DelegationTokenExtension>
+    implements DelegationTokenIssuer {
   
   private static DelegationTokenExtension DEFAULT_EXTENSION = 
       new DefaultDelegationTokenExtension();
@@ -36,22 +41,9 @@ public class KeyProviderDelegationTokenExtension extends
   /**
    * DelegationTokenExtension is a type of Extension that exposes methods
    * needed to work with Delegation Tokens.
-   */  
-  public interface DelegationTokenExtension extends 
-    KeyProviderExtension.Extension {
-    
-    /**
-     * The implementer of this class will take a renewer and add all
-     * delegation tokens associated with the renewer to the 
-     * <code>Credentials</code> object if it is not already present, 
-     * @param renewer the user allowed to renew the delegation tokens
-     * @param credentials cache in which to add new delegation tokens
-     * @return list of new delegation tokens
-     * @throws IOException thrown if IOException if an IO error occurs.
-     */
-    Token<?>[] addDelegationTokens(final String renewer,
-        Credentials credentials) throws IOException;
-
+   */
+  public interface DelegationTokenExtension
+      extends KeyProviderExtension.Extension, DelegationTokenIssuer {
     /**
      * Renews the given token.
      * @param token The token to be renewed.
@@ -66,6 +58,12 @@ public class KeyProviderDelegationTokenExtension extends
      * @throws IOException
      */
     Void cancelDelegationToken(final Token<?> token) throws IOException;
+
+    // Do NOT call this. Only intended for internal use.
+    @VisibleForTesting
+    @InterfaceAudience.Private
+    @InterfaceStability.Unstable
+    Token<?> selectDelegationToken(Credentials creds);
   }
   
   /**
@@ -82,6 +80,16 @@ public class KeyProviderDelegationTokenExtension extends
     }
 
     @Override
+    public String getCanonicalServiceName() {
+      return null;
+    }
+
+    @Override
+    public Token<?> getDelegationToken(String renewer) {
+      return null;
+    }
+
+    @Override
     public long renewDelegationToken(final Token<?> token) throws IOException {
       return 0;
     }
@@ -90,26 +98,29 @@ public class KeyProviderDelegationTokenExtension extends
     public Void cancelDelegationToken(final Token<?> token) throws IOException {
       return null;
     }
+
+    @Override
+    public Token<?> selectDelegationToken(Credentials creds) {
+      return null;
+    }
+
   }
 
   private KeyProviderDelegationTokenExtension(KeyProvider keyProvider,
       DelegationTokenExtension extensions) {
     super(keyProvider, extensions);
   }
-  
-  /**
-   * Passes the renewer and Credentials object to the underlying 
-   * {@link DelegationTokenExtension} 
-   * @param renewer the user allowed to renew the delegation tokens
-   * @param credentials cache in which to add new delegation tokens
-   * @return list of new delegation tokens
-   * @throws IOException thrown if IOException if an IO error occurs.
-   */
-  public Token<?>[] addDelegationTokens(final String renewer, 
-      Credentials credentials) throws IOException {
-    return getExtension().addDelegationTokens(renewer, credentials);
+
+  @Override
+  public String getCanonicalServiceName() {
+    return getExtension().getCanonicalServiceName();
   }
-  
+
+  @Override
+  public Token<?> getDelegationToken(final String renewer) throws IOException {
+    return getExtension().getDelegationToken(renewer);
+  }
+
   /**
    * Creates a <code>KeyProviderDelegationTokenExtension</code> using a given 
    * {@link KeyProvider}.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
index aa5de2c..81caff4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
@@ -22,15 +22,17 @@ import java.net.URI;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 /**
  * File systems that support Encryption Zones have to implement this interface.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public interface KeyProviderTokenIssuer {
+public interface KeyProviderTokenIssuer extends DelegationTokenIssuer {
 
   KeyProvider getKeyProvider() throws IOException;
 
   URI getKeyProviderUri() throws IOException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 26b528c..1718a1f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -32,14 +32,13 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
 import org.apache.hadoop.util.HttpExceptionUtils;
 import org.apache.hadoop.util.JsonSerialization;
@@ -58,7 +57,6 @@ import java.io.Writer;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.ConnectException;
 import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.SocketTimeoutException;
 import java.net.URI;
@@ -99,7 +97,7 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
 public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     KeyProviderDelegationTokenExtension.DelegationTokenExtension {
 
-  private static final Logger LOG =
+  static final Logger LOG =
       LoggerFactory.getLogger(KMSClientProvider.class);
 
   private static final String INVALID_SIGNATURE = "Invalid signature";
@@ -133,12 +131,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
 
   private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
 
+  private KeyProviderDelegationTokenExtension.DelegationTokenExtension
+      clientTokenProvider = this;
+  // the token's service.
   private final Text dtService;
-
-  // Allow fallback to default kms server port 9600 for certain tests that do
-  // not specify the port explicitly in the kms provider url.
-  @VisibleForTesting
-  public static volatile boolean fallbackDefaultPortForTesting = false;
+  // alias in the credentials.
+  private final Text canonicalService;
 
   private class EncryptedQueueRefiller implements
     ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@@ -162,6 +160,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     }
   }
 
+  static class TokenSelector extends AbstractDelegationTokenSelector {
+    static final TokenSelector INSTANCE = new TokenSelector();
+
+    TokenSelector() {
+      super(TOKEN_KIND);
+    }
+  }
+
   /**
    * The KMS implementation of {@link TokenRenewer}.
    */
@@ -182,8 +188,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     @Override
     public long renew(Token<?> token, Configuration conf) throws IOException {
       LOG.debug("Renewing delegation token {}", token);
-      KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
-          KeyProviderFactory.KEY_PROVIDER_PATH);
+      KeyProvider keyProvider = createKeyProvider(token, conf);
       try {
         if (!(keyProvider instanceof
             KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
@@ -204,8 +209,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     @Override
     public void cancel(Token<?> token, Configuration conf) throws IOException {
       LOG.debug("Canceling delegation token {}", token);
-      KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
-          KeyProviderFactory.KEY_PROVIDER_PATH);
+      KeyProvider keyProvider = createKeyProvider(token, conf);
       try {
         if (!(keyProvider instanceof
             KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
@@ -222,6 +226,19 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
         }
       }
     }
+
+    private static KeyProvider createKeyProvider(
+        Token<?> token, Configuration conf) throws IOException {
+      String service = token.getService().toString();
+      URI uri;
+      if (service != null && service.startsWith(SCHEME_NAME + ":/")) {
+        LOG.debug("Creating key provider with token service value {}", service);
+        uri = URI.create(service);
+      } else { // conf fallback
+        uri = KMSUtil.getKeyProviderUri(conf);
+      }
+      return (uri != null) ? KMSUtil.createKeyProviderFromUri(conf, uri) : null;
+    }
   }
 
   public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
@@ -283,12 +300,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
           }
           hostsPart = t[0];
         }
-        return createProvider(conf, origUrl, port, hostsPart);
+        KMSClientProvider[] providers =
+            createProviders(conf, origUrl, port, hostsPart);
+        return new LoadBalancingKMSClientProvider(providerUri, providers, conf);
       }
       return null;
     }
 
-    private KeyProvider createProvider(Configuration conf,
+    private KMSClientProvider[] createProviders(Configuration conf,
         URL origUrl, int port, String hostsPart) throws IOException {
       String[] hosts = hostsPart.split(";");
       KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
@@ -302,7 +321,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
           throw new IOException("Could not instantiate KMSProvider.", e);
         }
       }
-      return new LoadBalancingKMSClientProvider(providers, conf);
+      return providers;
     }
   }
 
@@ -358,13 +377,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
   public KMSClientProvider(URI uri, Configuration conf) throws IOException {
     super(conf);
     kmsUrl = createServiceURL(extractKMSPath(uri));
-    int kmsPort = kmsUrl.getPort();
-    if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
-      kmsPort = 9600;
-    }
-
-    InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
-    dtService = SecurityUtil.buildTokenService(addr);
+    // the token's service so it can be instantiated for renew/cancel.
+    dtService = getDtService(uri);
+    // the canonical service is the alias for the token in the credentials.
+    // typically it's the actual service in the token but older clients expect
+    // an address.
+    URI serviceUri = URI.create(kmsUrl.toString());
+    canonicalService = SecurityUtil.buildTokenService(serviceUri);
 
     if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
       sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
@@ -402,8 +421,22 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
                     KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
             new EncryptedQueueRefiller());
     authToken = new DelegationTokenAuthenticatedURL.Token();
-    LOG.debug("KMSClientProvider for KMS url: {} delegation token service: {}" +
-        " created.", kmsUrl, dtService);
+    LOG.debug("KMSClientProvider created for KMS url: {} delegation token "
+            + "service: {} canonical service: {}.", kmsUrl, dtService,
+        canonicalService);
+  }
+
+  protected static Text getDtService(URI uri) {
+    Text service;
+    // remove fragment for forward compatibility with logical naming.
+    final String fragment = uri.getFragment();
+    if (fragment != null) {
+      service = new Text(
+          uri.getScheme() + ":" + uri.getSchemeSpecificPart());
+    } else {
+      service = new Text(uri.toString());
+    }
+    return service;
   }
 
   private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException {
@@ -475,7 +508,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
         @Override
         public HttpURLConnection run() throws Exception {
           DelegationTokenAuthenticatedURL authUrl =
-              new DelegationTokenAuthenticatedURL(configurator);
+              createAuthenticatedURL();
           return authUrl.openConnection(url, authToken, doAsUser);
         }
       });
@@ -931,6 +964,96 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     return encKeyVersionQueue.getSize(keyName);
   }
 
+  // note: this is only a crutch for backwards compatibility.
+  // override the instance that will be used to select a token, intended
+  // to allow load balancing provider to find a token issued by any of its
+  // sub-providers.
+  protected void setClientTokenProvider(
+      KeyProviderDelegationTokenExtension.DelegationTokenExtension provider) {
+    clientTokenProvider = provider;
+  }
+
+  @VisibleForTesting
+  DelegationTokenAuthenticatedURL createAuthenticatedURL() {
+    return new DelegationTokenAuthenticatedURL(configurator) {
+      @Override
+      public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+          selectDelegationToken(URL url, Credentials creds) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Looking for delegation token. creds: {}",
+              creds.getAllTokens());
+        }
+        // clientTokenProvider is either "this" or a load balancing instance.
+        // if the latter, it will first look for the load balancer's uri
+        // service followed by each sub-provider for backwards-compatibility.
+        return clientTokenProvider.selectDelegationToken(creds);
+      }
+    };
+  }
+
+  @InterfaceAudience.Private
+  @Override
+  public Token<?> selectDelegationToken(Credentials creds) {
+    Token<?> token = selectDelegationToken(creds, dtService);
+    if (token == null) {
+      token = selectDelegationToken(creds, canonicalService);
+    }
+    return token;
+  }
+
+  protected static Token<?> selectDelegationToken(Credentials creds,
+                                                  Text service) {
+    Token<?> token = creds.getToken(service);
+    LOG.debug("selected by alias={} token={}", service, token);
+    if (token != null && TOKEN_KIND.equals(token.getKind())) {
+      return token;
+    }
+    token = TokenSelector.INSTANCE.selectToken(service, creds.getAllTokens());
+    LOG.debug("selected by service={} token={}", service, token);
+    return token;
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    return canonicalService.toString();
+  }
+
+  @Override
+  public Token<?> getDelegationToken(final String renewer) throws IOException {
+    final URL url = createURL(null, null, null, null);
+    final DelegationTokenAuthenticatedURL authUrl =
+        new DelegationTokenAuthenticatedURL(configurator);
+    Token<?> token = null;
+    try {
+      final String doAsUser = getDoAsUser();
+      token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
+        @Override
+        public Token<?> run() throws Exception {
+          // Not using the cached token here.. Creating a new token here
+          // everytime.
+          LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
+          return authUrl.getDelegationToken(url,
+              new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
+        }
+      });
+      if (token != null) {
+        token.setService(dtService);
+        LOG.info("New token created: ({})", token);
+      } else {
+        throw new IOException("Got NULL as delegation token");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      } else {
+        throw new IOException(e);
+      }
+    }
+    return token;
+  }
+
   @Override
   public long renewDelegationToken(final Token<?> dToken) throws IOException {
     try {
@@ -941,7 +1064,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
       LOG.debug("Renewing delegation token {} with url:{}, as:{}",
           token, url, doAsUser);
       final DelegationTokenAuthenticatedURL authUrl =
-          new DelegationTokenAuthenticatedURL(configurator);
+          createAuthenticatedURL();
       return getActualUgi().doAs(
           new PrivilegedExceptionAction<Long>() {
             @Override
@@ -973,7 +1096,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
               LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
                   dToken, url, doAsUser);
               final DelegationTokenAuthenticatedURL authUrl =
-                  new DelegationTokenAuthenticatedURL(configurator);
+                  createAuthenticatedURL();
               authUrl.cancelDelegationToken(url, token, doAsUser);
               return null;
             }
@@ -1025,47 +1148,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     return token;
   }
 
-  @Override
-  public Token<?>[] addDelegationTokens(final String renewer,
-      Credentials credentials) throws IOException {
-    Token<?>[] tokens = null;
-    Token<?> token = credentials.getToken(dtService);
-    if (token == null) {
-      final URL url = createURL(null, null, null, null);
-      final DelegationTokenAuthenticatedURL authUrl =
-          new DelegationTokenAuthenticatedURL(configurator);
-      try {
-        final String doAsUser = getDoAsUser();
-        token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
-          @Override
-          public Token<?> run() throws Exception {
-            // Not using the cached token here.. Creating a new token here
-            // everytime.
-            LOG.info("Getting new token from {}, renewer:{}", url, renewer);
-            return authUrl.getDelegationToken(url,
-                new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
-          }
-        });
-        if (token != null) {
-          LOG.info("New token received: ({})", token);
-          credentials.addToken(token.getService(), token);
-          tokens = new Token<?>[] { token };
-        } else {
-          throw new IOException("Got NULL as delegation token");
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } catch (Exception e) {
-        if (e instanceof IOException) {
-          throw (IOException) e;
-        } else {
-          throw new IOException(e);
-        }
-      }
-    }
-    return tokens;
-  }
-
   private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
     // Add existing credentials from the UGI, since provider is cached.
     Credentials creds = ugi.getCredentials();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
index e68e844..6cb2cdc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.crypto.key.kms;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.ConnectException;
+import java.net.URI;
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
@@ -36,12 +37,15 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.KMSUtil;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,19 +80,42 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
 
   private final KMSClientProvider[] providers;
   private final AtomicInteger currentIdx;
+  private final Text dtService; // service in token.
+  private final Text canonicalService; // credentials alias for token.
 
   private RetryPolicy retryPolicy = null;
 
-  public LoadBalancingKMSClientProvider(KMSClientProvider[] providers,
-      Configuration conf) {
-    this(shuffle(providers), Time.monotonicNow(), conf);
+  public LoadBalancingKMSClientProvider(URI providerUri,
+      KMSClientProvider[] providers, Configuration conf) {
+    this(providerUri, providers, Time.monotonicNow(), conf);
   }
 
   @VisibleForTesting
   LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed,
       Configuration conf) {
+    this(URI.create("kms://testing"), providers, seed, conf);
+  }
+
+  private LoadBalancingKMSClientProvider(URI uri,
+      KMSClientProvider[] providers, long seed, Configuration conf) {
     super(conf);
-    this.providers = providers;
+    // uri is the token service so it can be instantiated for renew/cancel.
+    dtService = KMSClientProvider.getDtService(uri);
+    // if provider not in conf, new client will alias on uri else addr.
+    if (KMSUtil.getKeyProviderUri(conf) == null) {
+      canonicalService = dtService;
+    } else {
+      // canonical service (credentials alias) will be the first underlying
+      // provider's service.  must be deterministic before shuffle so multiple
+      // calls for a token do not obtain another unnecessary token.
+      canonicalService = new Text(providers[0].getCanonicalServiceName());
+    }
+
+    // shuffle unless seed is 0 which is used by tests for determinism.
+    this.providers = (seed != 0) ? shuffle(providers) : providers;
+    for (KMSClientProvider provider : providers) {
+      provider.setClientTokenProvider(this);
+    }
     this.currentIdx = new AtomicInteger((int)(seed % providers.length));
     int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
         KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length);
@@ -106,6 +133,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
     this.retryPolicy = RetryPolicies.failoverOnNetworkException(
         RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis,
         sleepMaxMillis);
+    LOG.debug("Created LoadBalancingKMSClientProvider for KMS url: {} with {} "
+            + "providers. delegation token service: {}, canonical service: {}",
+        uri, providers.length, dtService, canonicalService);
   }
 
   @VisibleForTesting
@@ -113,6 +143,23 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
     return providers;
   }
 
+  @Override
+  public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+      selectDelegationToken(Credentials creds) {
+    Token<? extends TokenIdentifier> token =
+        KMSClientProvider.selectDelegationToken(creds, canonicalService);
+    // fallback to querying each sub-provider.
+    if (token == null) {
+      for (KMSClientProvider provider : getProviders()) {
+        token = provider.selectDelegationToken(creds);
+        if (token != null) {
+          break;
+        }
+      }
+    }
+    return token;
+  }
+
   private <T> T doOp(ProviderCallable<T> op, int currPos,
       boolean isIdempotent) throws IOException {
     if (providers.length == 0) {
@@ -193,13 +240,21 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
   }
 
   @Override
-  public Token<?>[]
-      addDelegationTokens(final String renewer, final Credentials credentials)
-          throws IOException {
-    return doOp(new ProviderCallable<Token<?>[]>() {
+  public String getCanonicalServiceName() {
+    return canonicalService.toString();
+  }
+
+  @Override
+  public Token<?> getDelegationToken(String renewer) throws IOException {
+    return doOp(new ProviderCallable<Token<?>>() {
       @Override
-      public Token<?>[] call(KMSClientProvider provider) throws IOException {
-        return provider.addDelegationTokens(renewer, credentials);
+      public Token<?> call(KMSClientProvider provider) throws IOException {
+        Token<?> token = provider.getDelegationToken(renewer);
+        // override sub-providers service with our own so it can be used
+        // across all providers.
+        token.setService(dtService);
+        LOG.debug("New token service set. Token: ({})", token);
+        return token;
       }
     }, nextIdx(), false);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 66b6d44..3d40b6a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -58,13 +58,13 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
@@ -121,7 +121,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
 @SuppressWarnings("DeprecatedIsStillUsed")
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public abstract class FileSystem extends Configured implements Closeable {
+public abstract class FileSystem extends Configured
+    implements Closeable, DelegationTokenIssuer {
   public static final String FS_DEFAULT_NAME_KEY =
                    CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
   public static final String DEFAULT_FS =
@@ -386,6 +387,7 @@ public abstract class FileSystem extends Configured implements Closeable {
    */
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
+  @Override
   public String getCanonicalServiceName() {
     return (getChildFileSystems() == null)
       ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
@@ -600,72 +602,12 @@ public abstract class FileSystem extends Configured implements Closeable {
    * @throws IOException on any problem obtaining a token
    */
   @InterfaceAudience.Private()
+  @Override
   public Token<?> getDelegationToken(String renewer) throws IOException {
     return null;
   }
 
   /**
-   * Obtain all delegation tokens used by this FileSystem that are not
-   * already present in the given Credentials. Existing tokens will neither
-   * be verified as valid nor having the given renewer.  Missing tokens will
-   * be acquired and added to the given Credentials.
-   *
-   * Default Impl: works for simple FS with its own token
-   * and also for an embedded FS whose tokens are those of its
-   * child FileSystems (i.e. the embedded FS has no tokens of its own).
-   *
-   * @param renewer the user allowed to renew the delegation tokens
-   * @param credentials cache in which to add new delegation tokens
-   * @return list of new delegation tokens
-   * @throws IOException problems obtaining a token
-   */
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public Token<?>[] addDelegationTokens(
-      final String renewer, Credentials credentials) throws IOException {
-    if (credentials == null) {
-      credentials = new Credentials();
-    }
-    final List<Token<?>> tokens = new ArrayList<>();
-    collectDelegationTokens(renewer, credentials, tokens);
-    return tokens.toArray(new Token<?>[tokens.size()]);
-  }
-
-  /**
-   * Recursively obtain the tokens for this FileSystem and all descendant
-   * FileSystems as determined by {@link #getChildFileSystems()}.
-   * @param renewer the user allowed to renew the delegation tokens
-   * @param credentials cache in which to add the new delegation tokens
-   * @param tokens list in which to add acquired tokens
-   * @throws IOException problems obtaining a token
-   */
-  private void collectDelegationTokens(final String renewer,
-                                       final Credentials credentials,
-                                       final List<Token<?>> tokens)
-                                           throws IOException {
-    final String serviceName = getCanonicalServiceName();
-    // Collect token of the this filesystem and then of its embedded children
-    if (serviceName != null) { // fs has token, grab it
-      final Text service = new Text(serviceName);
-      Token<?> token = credentials.getToken(service);
-      if (token == null) {
-        token = getDelegationToken(renewer);
-        if (token != null) {
-          tokens.add(token);
-          credentials.addToken(service, token);
-        }
-      }
-    }
-    // Now collect the tokens from the children
-    final FileSystem[] children = getChildFileSystems();
-    if (children != null) {
-      for (final FileSystem fs : children) {
-        fs.collectDelegationTokens(renewer, credentials, tokens);
-      }
-    }
-  }
-
-  /**
    * Get all the immediate child FileSystems embedded in this FileSystem.
    * It does not recurse and get grand children.  If a FileSystem
    * has multiple child FileSystems, then it must return a unique list
@@ -680,6 +622,13 @@ public abstract class FileSystem extends Configured implements Closeable {
     return null;
   }
 
+  @InterfaceAudience.Private
+  @Override
+  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+      throws IOException {
+    return getChildFileSystems();
+  }
+
   /**
    * Create a file with the provided permission.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
index 35589a2..4e9881b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
@@ -296,15 +296,11 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
       Credentials creds = UserGroupInformation.getCurrentUser().
           getCredentials();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Token not set, looking for delegation token. Creds:{}",
-            creds.getAllTokens());
+        LOG.debug("Token not set, looking for delegation token. Creds:{},"
+                + " size:{}", creds.getAllTokens(), creds.numberOfTokens());
       }
       if (!creds.getAllTokens().isEmpty()) {
-        InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
-            url.getPort());
-        Text service = SecurityUtil.buildTokenService(serviceAddr);
-        dToken = creds.getToken(service);
-        LOG.debug("Using delegation token {} from service:{}", dToken, service);
+        dToken = selectDelegationToken(url, creds);
         if (dToken != null) {
           if (useQueryStringForDelegationToken()) {
             // delegation token will go in the query string, injecting it
@@ -341,6 +337,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
   }
 
   /**
+   * Select a delegation token from all tokens in credentials, based on url.
+   */
+  @InterfaceAudience.Private
+  public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+      selectDelegationToken(URL url, Credentials creds) {
+    final InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
+        url.getPort());
+    final Text service = SecurityUtil.buildTokenService(serviceAddr);
+    org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
+        creds.getToken(service);
+    LOG.debug("Using delegation token {} from service:{}", dToken, service);
+    return dToken;
+  }
+
+  /**
    * Requests a delegation token using the configured <code>Authenticator</code>
    * for authentication.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java
new file mode 100644
index 0000000..90e72b9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.org.apache.hadoop.security.token;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class for issuing delegation tokens.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "Yarn"})
+@InterfaceStability.Unstable
+public interface DelegationTokenIssuer {
+
+  /**
+   * The service name used as the alias for the  token in the credential
+   * token map.  addDelegationTokens will use this to determine if
+   * a token exists, and if not, add a new token with this alias.
+   */
+  String getCanonicalServiceName();
+
+  /**
+   * Unconditionally get a new token with the optional renewer.  Returning
+   * null indicates the service does not issue tokens.
+   */
+  Token<?> getDelegationToken(String renewer) throws IOException;
+
+  /**
+   * Issuers may need tokens from additional services.
+   */
+  default DelegationTokenIssuer[] getAdditionalTokenIssuers()
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Given a renewer, add delegation tokens for issuer and it's child issuers
+   * to the <code>Credentials</code> object if it is not already present.
+   *<p>
+   * Note: This method is not intended to be overridden.  Issuers should
+   * implement getCanonicalService and getDelegationToken to ensure
+   * consistent token acquisition behavior.
+   *
+   * @param renewer the user allowed to renew the delegation tokens
+   * @param credentials cache in which to add new delegation tokens
+   * @return list of new delegation tokens
+   * @throws IOException thrown if IOException if an IO error occurs.
+   */
+  default Token<?>[] addDelegationTokens(
+      final String renewer, Credentials credentials) throws IOException {
+    if (credentials == null) {
+      credentials = new Credentials();
+    }
+    final List<Token<?>> tokens = new ArrayList<>();
+    collectDelegationTokens(this, renewer, credentials, tokens);
+    return tokens.toArray(new Token<?>[tokens.size()]);
+  }
+
+  /**
+   * NEVER call this method directly.
+   */
+  @InterfaceAudience.Private
+  static void collectDelegationTokens(
+      final DelegationTokenIssuer issuer,
+      final String renewer,
+      final Credentials credentials,
+      final List<Token<?>> tokens) throws IOException {
+    final String serviceName = issuer.getCanonicalServiceName();
+    // Collect token of the this issuer and then of its embedded children
+    if (serviceName != null) {
+      final Text service = new Text(serviceName);
+      Token<?> token = credentials.getToken(service);
+      if (token == null) {
+        token = issuer.getDelegationToken(renewer);
+        if (token != null) {
+          tokens.add(token);
+          credentials.addToken(service, token);
+        }
+      }
+    }
+    // Now collect the tokens from the children.
+    final DelegationTokenIssuer[] ancillary =
+        issuer.getAdditionalTokenIssuers();
+    if (ancillary != null) {
+      for (DelegationTokenIssuer subIssuer : ancillary) {
+        collectDelegationTokens(subIssuer, renewer, credentials, tokens);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
index c96c6fb..5b48da1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
@@ -59,12 +59,23 @@ public final class KMSUtil {
   public static KeyProvider createKeyProvider(final Configuration conf,
       final String configKeyName) throws IOException {
     LOG.debug("Creating key provider with config key {}", configKeyName);
+    URI uri = getKeyProviderUri(conf, configKeyName);
+    return (uri != null) ? createKeyProviderFromUri(conf, uri) : null;
+  }
+
+  public static URI getKeyProviderUri(final Configuration conf) {
+    return KMSUtil.getKeyProviderUri(
+        conf, KeyProviderFactory.KEY_PROVIDER_PATH);
+  }
+
+  public static URI getKeyProviderUri(final Configuration conf,
+                                      final String configKeyName) {
     final String providerUriStr = conf.getTrimmed(configKeyName);
     // No provider set in conf
     if (providerUriStr == null || providerUriStr.isEmpty()) {
       return null;
     }
-    return createKeyProviderFromUri(conf, URI.create(providerUriStr));
+    return URI.create(providerUriStr);
   }
 
   public static KeyProvider createKeyProviderFromUri(final Configuration conf,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
index df5d3e8..4fabc5b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
@@ -51,23 +51,27 @@ public class TestKeyProviderDelegationTokenExtension {
         KeyProviderDelegationTokenExtension
         .createKeyProviderDelegationTokenExtension(kp);
     Assert.assertNotNull(kpDTE1);
-    // Default implementation should be a no-op and return null
-    Assert.assertNull(kpDTE1.addDelegationTokens("user", credentials));
+    Token<?>[] tokens = kpDTE1.addDelegationTokens("user", credentials);
+    // Default implementation should return no tokens.
+    Assert.assertNotNull(tokens);
+    Assert.assertEquals(0, tokens.length);
     
     MockKeyProvider mock = mock(MockKeyProvider.class);
     Mockito.when(mock.getConf()).thenReturn(new Configuration());
-    when(mock.addDelegationTokens("renewer", credentials)).thenReturn(
-        new Token<?>[]{new Token(null, null, new Text("kind"), new Text(
-            "service"))}
+    when(mock.getCanonicalServiceName()).thenReturn("cservice");
+    when(mock.getDelegationToken("renewer")).thenReturn(
+        new Token(null, null, new Text("kind"), new Text(
+            "tservice"))
     );
     KeyProviderDelegationTokenExtension kpDTE2 =
         KeyProviderDelegationTokenExtension
         .createKeyProviderDelegationTokenExtension(mock);
-    Token<?>[] tokens = 
-        kpDTE2.addDelegationTokens("renewer", credentials);
+    tokens = kpDTE2.addDelegationTokens("renewer", credentials);
     Assert.assertNotNull(tokens);
+    Assert.assertEquals(1, tokens.length);
     Assert.assertEquals("kind", tokens[0].getKind().toString());
-    
+    Assert.assertEquals("tservice", tokens[0].getService().toString());
+    Assert.assertNotNull(credentials.getToken(new Text("cservice")));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
new file mode 100644
index 0000000..b87f45a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit test for {@link KMSClientProvider} class.
+ */
+public class TestKMSClientProvider {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestKMSClientProvider.class);
+
+  private final Token token = new Token();
+  private final Token oldToken = new Token();
+  private final String urlString = "https://host:16000/kms";
+  private final String providerUriString = "kms://https@host:16000/kms";
+  private final String oldTokenService = "host:16000";
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(60000);
+
+  {
+    GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
+  }
+
+  @Before
+  public void setup() {
+    SecurityUtil.setTokenServiceUseIp(false);
+    token.setKind(TOKEN_KIND);
+    token.setService(new Text(providerUriString));
+    oldToken.setKind(TOKEN_KIND);
+    oldToken.setService(new Text(oldTokenService));
+  }
+
+  @Test
+  public void testSelectDelegationToken() throws Exception {
+    final Credentials creds = new Credentials();
+    creds.addToken(new Text(providerUriString), token);
+    assertNull(KMSClientProvider.selectDelegationToken(creds, null));
+    assertNull(KMSClientProvider
+        .selectDelegationToken(creds, new Text(oldTokenService)));
+    assertEquals(token, KMSClientProvider
+        .selectDelegationToken(creds, new Text(providerUriString)));
+  }
+
+  @Test
+  public void testSelectTokenOldService() throws Exception {
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(providerUriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf);
+    try {
+      final Credentials creds = new Credentials();
+      creds.addToken(new Text(oldTokenService), oldToken);
+      final Token t = kp.selectDelegationToken(creds);
+      assertEquals(oldToken, t);
+    } finally {
+      kp.close();
+    }
+  }
+
+  @Test
+  public void testSelectTokenWhenBothExist() throws Exception {
+    final Credentials creds = new Credentials();
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(providerUriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf);
+    try {
+      creds.addToken(token.getService(), token);
+      creds.addToken(oldToken.getService(), oldToken);
+      final Token t = kp.selectDelegationToken(creds);
+      assertEquals("new token should be selected when both exist", token, t);
+    } finally {
+      kp.close();
+    }
+  }
+
+  @Test
+  public void testURLSelectTokenUriFormat() throws Exception {
+    testURLSelectToken(token);
+  }
+
+  @Test
+  public void testURLSelectTokenIpPort() throws Exception {
+    testURLSelectToken(oldToken);
+  }
+
+  private void testURLSelectToken(final Token tok)
+      throws URISyntaxException, IOException {
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(providerUriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf);
+    final DelegationTokenAuthenticatedURL url = kp.createAuthenticatedURL();
+    final Credentials creds = new Credentials();
+    creds.addToken(tok.getService(), tok);
+    final Token chosen = url.selectDelegationToken(new URL(urlString), creds);
+    assertEquals(tok, chosen);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
index 058db92..c99c63d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key.kms;
 import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -49,7 +50,6 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -68,33 +68,27 @@ public class TestLoadBalancingKMSClientProvider {
     SecurityUtil.setTokenServiceUseIp(false);
   }
 
-  @After
-  public void teardown() throws IOException {
-    KMSClientProvider.fallbackDefaultPortForTesting = false;
-  }
-
   @Test
   public void testCreation() throws Exception {
     Configuration conf = new Configuration();
-    KMSClientProvider.fallbackDefaultPortForTesting = true;
     KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
-        "kms://http@host1/kms/foo"), conf);
+        "kms://http@host1:9600/kms/foo"), conf);
     assertTrue(kp instanceof LoadBalancingKMSClientProvider);
     KMSClientProvider[] providers =
         ((LoadBalancingKMSClientProvider) kp).getProviders();
     assertEquals(1, providers.length);
-    assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
+    assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
         Sets.newHashSet(providers[0].getKMSUrl()));
 
     kp = new KMSClientProvider.Factory().createProvider(new URI(
-        "kms://http@host1;host2;host3/kms/foo"), conf);
+        "kms://http@host1;host2;host3:9600/kms/foo"), conf);
     assertTrue(kp instanceof LoadBalancingKMSClientProvider);
     providers =
         ((LoadBalancingKMSClientProvider) kp).getProviders();
     assertEquals(3, providers.length);
-    assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
-        "http://host2/kms/foo/v1/",
-        "http://host3/kms/foo/v1/"),
+    assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
+        "http://host2:9600/kms/foo/v1/",
+        "http://host3:9600/kms/foo/v1/"),
         Sets.newHashSet(providers[0].getKMSUrl(),
             providers[1].getKMSUrl(),
             providers[2].getKMSUrl()));
@@ -257,10 +251,9 @@ public class TestLoadBalancingKMSClientProvider {
   @Test
   public void testClassCastException() throws Exception {
     Configuration conf = new Configuration();
-    KMSClientProvider.fallbackDefaultPortForTesting = true;
     KMSClientProvider p1 = new MyKMSClientProvider(
-        new URI("kms://http@host1/kms/foo"), conf);
-    LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+        new URI("kms://http@host1:9600/kms/foo"), conf);
+    LoadBalancingKMSClientProvider kp =   new LoadBalancingKMSClientProvider(
         new KMSClientProvider[] {p1}, 0, conf);
     try {
       kp.generateEncryptedKey("foo");
@@ -878,4 +871,42 @@ public class TestLoadBalancingKMSClientProvider {
     verify(kp.getProviders()[2], Mockito.times(1))
         .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
   }
+
+  @Test
+  public void testTokenServiceCreationWithLegacyFormat() throws Exception {
+    Configuration conf = new Configuration();
+    // Create keyprovider with old token format (ip:port)
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        "kms:/something");
+    String authority = "host1:9600";
+    URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo");
+    KeyProvider kp =
+        new KMSClientProvider.Factory().createProvider(kmsUri, conf);
+    assertTrue(kp instanceof LoadBalancingKMSClientProvider);
+    LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp;
+    assertEquals(1, lbkp.getProviders().length);
+    assertEquals(authority, lbkp.getCanonicalServiceName());
+    for (KMSClientProvider provider : lbkp.getProviders()) {
+      assertEquals(authority, provider.getCanonicalServiceName());
+    }
+  }
+
+  @Test
+  public void testTokenServiceCreationWithUriFormat() throws Exception {
+    final Configuration conf = new Configuration();
+    final URI kmsUri = URI.create("kms://http@host1;host2;host3:9600/kms/foo");
+    final KeyProvider kp =
+        new KMSClientProvider.Factory().createProvider(kmsUri, conf);
+    assertTrue(kp instanceof LoadBalancingKMSClientProvider);
+    final LoadBalancingKMSClientProvider lbkp =
+        (LoadBalancingKMSClientProvider) kp;
+    assertEquals(kmsUri.toString(), lbkp.getCanonicalServiceName());
+    KMSClientProvider[] providers = lbkp.getProviders();
+    assertEquals(3, providers.length);
+    for (int i = 0; i < providers.length; i++) {
+      assertEquals(URI.create(providers[i].getKMSUrl()).getAuthority(),
+          providers[i].getCanonicalServiceName());
+      assertNotEquals(kmsUri, providers[i].getCanonicalServiceName());
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index 0e9a612..a766cfb4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.util.Progressable;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -125,6 +126,8 @@ public class TestFilterFileSystem {
     public int getDefaultPort();
     public String getCanonicalServiceName();
     public Token<?> getDelegationToken(String renewer) throws IOException;
+    public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+        throws IOException;
     public boolean deleteOnExit(Path f) throws IOException;
     public boolean cancelDeleteOnExit(Path f) throws IOException;
     public Token<?>[] addDelegationTokens(String renewer, Credentials creds)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 1b69693..870a828 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Assert;
 import org.junit.Test;
@@ -145,6 +146,8 @@ public class TestHarFileSystem {
     public int getDefaultPort();
     public String getCanonicalServiceName();
     public Token<?> getDelegationToken(String renewer) throws IOException;
+    public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+        throws IOException;
     public FileChecksum getFileChecksum(Path f) throws IOException;
     public boolean deleteOnExit(Path f) throws IOException;
     public boolean cancelDeleteOnExit(Path f) throws IOException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index d709ba8..af59877 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
 import org.apache.hadoop.crypto.key.kms.ValueQueue;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.Credentials;
@@ -44,6 +45,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.Whitebox;
@@ -96,6 +98,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -141,21 +145,78 @@ public class TestKMS {
   }
 
   public static abstract class KMSCallable<T> implements Callable<T> {
-    private URL kmsUrl;
+    private List<URL> kmsUrl;
 
     protected URL getKMSUrl() {
-      return kmsUrl;
+      return kmsUrl.get(0);
+    }
+
+    protected URL[] getKMSHAUrl() {
+      URL[] urls = new URL[kmsUrl.size()];
+      return kmsUrl.toArray(urls);
+    }
+
+    protected void addKMSUrl(URL url) {
+      if (kmsUrl == null) {
+        kmsUrl = new ArrayList<URL>();
+      }
+      kmsUrl.add(url);
+    }
+
+    /*
+     * The format of the returned value will be
+     * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
+     */
+    protected String generateLoadBalancingKeyProviderUriString() {
+      if (kmsUrl == null || kmsUrl.size() == 0) {
+        return null;
+      }
+      StringBuffer sb = new StringBuffer();
+
+      for (int i = 0; i < kmsUrl.size(); i++) {
+        sb.append(KMSClientProvider.SCHEME_NAME + "://" +
+            kmsUrl.get(0).getProtocol() + "@");
+        URL url = kmsUrl.get(i);
+        sb.append(url.getAuthority());
+        if (url.getPath() != null) {
+          sb.append(url.getPath());
+        }
+        if (i < kmsUrl.size() - 1) {
+          sb.append(",");
+        }
+      }
+      return sb.toString();
     }
   }
 
   protected KeyProvider createProvider(URI uri, Configuration conf)
       throws IOException {
-    final KeyProvider ret = new LoadBalancingKMSClientProvider(
+    final KeyProvider ret = new LoadBalancingKMSClientProvider(uri,
         new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
     providersCreated.add(ret);
     return ret;
   }
 
+  /**
+  * create a LoadBalancingKMSClientProvider from an array of URIs.
+  * @param uris an array of KMS URIs
+  * @param conf configuration object
+  * @return a LoadBalancingKMSClientProvider object
+  * @throws IOException
+  */
+  protected LoadBalancingKMSClientProvider createHAProvider(URI lbUri,
+      URI[] uris, Configuration conf) throws IOException {
+    KMSClientProvider[] providers = new KMSClientProvider[uris.length];
+    for (int i = 0; i < providers.length; i++) {
+      providers[i] =
+          new KMSClientProvider(uris[i], conf);
+    }
+    final LoadBalancingKMSClientProvider ret =
+        new LoadBalancingKMSClientProvider(lbUri, providers, conf);
+    providersCreated.add(ret);
+    return ret;
+  }
+
   private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
       throws IOException {
     final KMSClientProvider ret = new KMSClientProvider(uri, conf);
@@ -170,22 +231,34 @@ public class TestKMS {
 
   protected <T> T runServer(int port, String keystore, String password, File confDir,
       KMSCallable<T> callable) throws Exception {
+    return runServer(new int[] {port}, keystore, password, confDir, callable);
+  }
+
+  protected <T> T runServer(int[] ports, String keystore, String password,
+      File confDir, KMSCallable<T> callable) throws Exception {
     MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
         .setLog4jConfFile("log4j.properties");
     if (keystore != null) {
       miniKMSBuilder.setSslConf(new File(keystore), password);
     }
-    if (port > 0) {
-      miniKMSBuilder.setPort(port);
+    final List<MiniKMS> kmsList = new ArrayList<>();
+
+    for (int i = 0; i < ports.length; i++) {
+      if (ports[i] > 0) {
+        miniKMSBuilder.setPort(ports[i]);
+      }
+      MiniKMS miniKMS = miniKMSBuilder.build();
+      kmsList.add(miniKMS);
+      miniKMS.start();
+      LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
+      callable.addKMSUrl(miniKMS.getKMSUrl());
     }
-    MiniKMS miniKMS = miniKMSBuilder.build();
-    miniKMS.start();
     try {
-      System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
-      callable.kmsUrl = miniKMS.getKMSUrl();
       return callable.call();
     } finally {
-      miniKMS.stop();
+      for (MiniKMS miniKMS: kmsList) {
+        miniKMS.stop();
+      }
     }
   }
 
@@ -240,6 +313,13 @@ public class TestKMS {
     return new URI("kms://" + str);
   }
 
+  public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
+    URI[] uris = new URI[kmsUrls.length];
+    for (int i=0; i< kmsUrls.length; i++) {
+      uris[i] = createKMSUri(kmsUrls[i]);
+    }
+    return uris;
+  }
 
   private static class KerberosConfiguration
       extends javax.security.auth.login.Configuration {
@@ -306,6 +386,7 @@ public class TestKMS {
     principals.add("otheradmin");
     principals.add("client/host");
     principals.add("client1");
+    principals.add("foo");
     for (KMSACLs.Type type : KMSACLs.Type.values()) {
       principals.add(type.toString());
     }
@@ -2011,7 +2092,6 @@ public class TestKMS {
             return null;
           }
         });
-
         nonKerberosUgi.addCredentials(credentials);
 
         try {
@@ -2067,6 +2147,18 @@ public class TestKMS {
     testDelegationTokensOps(true, true);
   }
 
+  private Text getTokenService(KeyProvider provider) {
+    assertTrue("KeyProvider should be an instance of " +
+        "LoadBalancingKMSClientProvider", (provider instanceof
+            LoadBalancingKMSClientProvider));
+    assertEquals("Num client providers should be 1", 1,
+        ((LoadBalancingKMSClientProvider)provider).getProviders().length);
+    final Text tokenService = new Text(
+        (((LoadBalancingKMSClientProvider)provider).getProviders()[0])
+        .getCanonicalServiceName());
+    return tokenService;
+  }
+
   private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
       throws Exception {
     final File confDir = getTestDir();
@@ -2103,6 +2195,10 @@ public class TestKMS {
           @Override
           public Void run() throws Exception {
             KeyProvider kp = createProvider(uri, clientConf);
+            // Unset the conf value for key provider path just to be sure that
+            // the key provider created for renew and cancel token is from
+            // token service field.
+            clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
             // test delegation token retrieval
             KeyProviderDelegationTokenExtension kpdte =
                 KeyProviderDelegationTokenExtension.
@@ -2110,13 +2206,10 @@ public class TestKMS {
             final Credentials credentials = new Credentials();
             final Token<?>[] tokens =
                 kpdte.addDelegationTokens("client1", credentials);
+            Text tokenService = getTokenService(kp);
             Assert.assertEquals(1, credentials.getAllTokens().size());
-            InetSocketAddress kmsAddr =
-                new InetSocketAddress(getKMSUrl().getHost(),
-                    getKMSUrl().getPort());
             Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+                credentials.getToken(tokenService).getKind());
 
             // Test non-renewer user cannot renew.
             for (Token<?> token : tokens) {
@@ -2258,15 +2351,16 @@ public class TestKMS {
             // Get a DT and use it.
             final Credentials credentials = new Credentials();
             kpdte.addDelegationTokens("client", credentials);
+            Text tokenService = getTokenService(kp);
             Assert.assertEquals(1, credentials.getAllTokens().size());
             Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
-                getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
+                getToken(tokenService).getKind());
             UserGroupInformation.getCurrentUser().addCredentials(credentials);
             LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
                 getCurrentUser().getCredentials().getAllTokens());
             Token<?> token =
                 UserGroupInformation.getCurrentUser().getCredentials()
-                    .getToken(SecurityUtil.buildTokenService(kmsAddr));
+                    .getToken(tokenService);
             Assert.assertNotNull(token);
             job1Token.add(token);
 
@@ -2302,17 +2396,17 @@ public class TestKMS {
             // Get a new DT, but don't use it yet.
             final Credentials newCreds = new Credentials();
             kpdte.addDelegationTokens("client", newCreds);
+            Text tokenService = getTokenService(kp);
             Assert.assertEquals(1, newCreds.getAllTokens().size());
             Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+                newCreds.getToken(tokenService).
                     getKind());
 
             // Using job 1's DT should fail.
             final Credentials oldCreds = new Credentials();
             for (Token<?> token : job1Token) {
               if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
-                oldCreds
-                    .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
+                oldCreds.addToken(tokenService, token);
               }
             }
             UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
@@ -2328,7 +2422,7 @@ public class TestKMS {
             // Using the new DT should succeed.
             Assert.assertEquals(1, newCreds.getAllTokens().size());
             Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+                newCreds.getToken(tokenService).
                     getKind());
             UserGroupInformation.getCurrentUser().addCredentials(newCreds);
             LOG.info("Credetials now are: {}", UserGroupInformation
@@ -2357,7 +2451,14 @@ public class TestKMS {
     doKMSWithZK(true, true);
   }
 
-  public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+  private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+      KMSCallable<T> callable) throws Exception {
+    return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
+  }
+
+  private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+      KMSCallable<T> callable, int kmsSize) throws Exception {
+
     TestingServer zkServer = null;
     try {
       zkServer = new TestingServer();
@@ -2403,43 +2504,189 @@ public class TestKMS {
 
       writeConf(testDir, conf);
 
-      KMSCallable<KeyProvider> c =
-          new KMSCallable<KeyProvider>() {
-        @Override
-        public KeyProvider call() throws Exception {
-          final Configuration conf = new Configuration();
-          conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
-          final URI uri = createKMSUri(getKMSUrl());
-
-          final KeyProvider kp =
-              doAs("SET_KEY_MATERIAL",
-                  new PrivilegedExceptionAction<KeyProvider>() {
-                    @Override
-                    public KeyProvider run() throws Exception {
-                      KeyProvider kp = createProvider(uri, conf);
-                          kp.createKey("k1", new byte[16],
-                              new KeyProvider.Options(conf));
-                          kp.createKey("k2", new byte[16],
-                              new KeyProvider.Options(conf));
-                          kp.createKey("k3", new byte[16],
-                              new KeyProvider.Options(conf));
-                      return kp;
-                    }
-                  });
-          return kp;
-        }
-      };
-
-      runServer(null, null, testDir, c);
+      int[] ports = new int[kmsSize];
+      for (int i = 0; i < ports.length; i++) {
+        ports[i] = -1;
+      }
+      return runServer(ports, null, null, testDir, callable);
     } finally {
       if (zkServer != null) {
         zkServer.stop();
         zkServer.close();
       }
     }
+  }
 
+  public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+    KMSCallable<KeyProvider> c =
+        new KMSCallable<KeyProvider>() {
+          @Override
+          public KeyProvider call() throws Exception {
+            final Configuration conf = new Configuration();
+            conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+            final URI uri = createKMSUri(getKMSUrl());
+
+            final KeyProvider kp =
+                doAs("SET_KEY_MATERIAL",
+                    new PrivilegedExceptionAction<KeyProvider>() {
+                      @Override
+                      public KeyProvider run() throws Exception {
+                        KeyProvider kp = createProvider(uri, conf);
+                        kp.createKey("k1", new byte[16],
+                            new KeyProvider.Options(conf));
+                        kp.createKey("k2", new byte[16],
+                            new KeyProvider.Options(conf));
+                        kp.createKey("k3", new byte[16],
+                            new KeyProvider.Options(conf));
+                        return kp;
+                      }
+                    });
+            return kp;
+          }
+        };
+
+    runServerWithZooKeeper(zkDTSM, zkSigner, c);
+  }
+
+  @Test
+  public void testKMSHAZooKeeperDelegationToken() throws Exception {
+    final int kmsSize = 2;
+    doKMSWithZKWithDelegationToken(true, true, kmsSize);
   }
 
+  private void doKMSWithZKWithDelegationToken(boolean zkDTSM, boolean zkSigner,
+      int kmsSize) throws Exception {
+    // Create a KMSCallable to execute requests after ZooKeeper and KMS are up.
+    KMSCallable<Void> c = new KMSCallable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final Configuration conf = new Configuration();
+        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+        final URI[] uris = createKMSHAUri(getKMSHAUrl());
+        final Credentials credentials = new Credentials();
+        // Create a UGI without Kerberos auth. It will authenticate with tokens.
+        final UserGroupInformation nonKerberosUgi =
+            UserGroupInformation.getCurrentUser();
+        final String lbUri = generateLoadBalancingKeyProviderUriString();
+        final LoadBalancingKMSClientProvider lbkp =
+            createHAProvider(URI.create(lbUri), uris, conf);
+        conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+        // get delegation tokens using kerberos login
+        doAs("SET_KEY_MATERIAL",
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                KeyProviderDelegationTokenExtension kpdte =
+                    KeyProviderDelegationTokenExtension.
+                        createKeyProviderDelegationTokenExtension(lbkp);
+                kpdte.addDelegationTokens("foo", credentials);
+                return null;
+              }
+            });
+
+        nonKerberosUgi.addCredentials(credentials);
+        // Access KMS using delegation token for authentication, no Kerberos.
+        nonKerberosUgi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            // Create a kms client with one provider at a time. Must use one
+            // provider so that if it fails to authenticate, it does not fall
+            // back to the next KMS instance.
+            // Should succeed because it has delegation tokens for any instance.
+            int i = 0;
+            for (KMSClientProvider provider : lbkp.getProviders()) {
+              final String key = "k" + i++;
+              LOG.info("Connect to {} to create key {}.", provider, key);
+              provider.createKey(key, new KeyProvider.Options(conf));
+            }
+            return null;
+          }
+        });
+
+        final Collection<Token<? extends TokenIdentifier>> tokens =
+            credentials.getAllTokens();
+        doAs("foo", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            assertEquals(1, tokens.size());
+            Token token = tokens.iterator().next();
+            assertEquals(KMSDelegationToken.TOKEN_KIND, token.getKind());
+            LOG.info("Got dt for token: {}", token);
+            final long tokenLife = token.renew(conf);
+            LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
+            Thread.sleep(10);
+            final long newTokenLife = token.renew(conf);
+            LOG.info("Renewed token {}, new lifetime:{}", token, newTokenLife);
+            assertTrue(newTokenLife > tokenLife);
+
+            // test delegation token cancellation
+            LOG.info("Got dt for token: {}", token);
+            token.cancel(conf);
+            LOG.info("Cancelled token {}", token);
+            try {
+              token.renew(conf);
+              fail("should not be able to renew a canceled token");
+            } catch (Exception e) {
+              LOG.info("Expected exception when renewing token", e);
+            }
+            return null;
+          }
+        });
+
+        final Credentials newCredentials = new Credentials();
+        doAs("SET_KEY_MATERIAL",
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                KeyProviderDelegationTokenExtension kpdte =
+                    KeyProviderDelegationTokenExtension.
+                        createKeyProviderDelegationTokenExtension(lbkp);
+                kpdte.addDelegationTokens("foo", newCredentials);
+                return null;
+              }
+            });
+
+        doAs("foo", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            KMSClientProvider kp1 = lbkp.getProviders()[0];
+            URL[] urls = getKMSHAUrl();
+            final Collection<Token<? extends TokenIdentifier>> tokens =
+                newCredentials.getAllTokens();
+            assertEquals(1, tokens.size());
+            Token token = tokens.iterator().next();
+            assertEquals(KMSDelegationToken.TOKEN_KIND,
+                token.getKind());
+            // Testing backward compatibility of token renewal and cancellation.
+            // Set the token service to ip:port format and test to renew/cancel.
+            Text text = SecurityUtil.buildTokenService(
+                new InetSocketAddress(urls[0].getHost(), urls[0].getPort()));
+            token.setService(text);
+            conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
+            long tokenLife = 0L;
+            for (KMSClientProvider kp : lbkp.getProviders()) {
+              long renewedTokenLife = token.renew(conf);
+              LOG.info("Renewed token of kind {}, new lifetime:{}",
+                  token.getKind(), renewedTokenLife);
+              assertTrue(renewedTokenLife > tokenLife);
+              tokenLife = renewedTokenLife;
+              Thread.sleep(10);
+            }
+            token.cancel(conf);
+            try {
+              token.renew(conf);
+              fail("should not be able to renew a canceled token");
+            } catch (IOException e) {
+              LOG.info("Expected exception when renewing token", e);
+            }
+            return null;
+          }
+        });
+        return null;
+      }
+    };
+    runServerWithZooKeeper(zkDTSM, zkSigner, c, kmsSize);
+  }
 
   @Test
   public void testProxyUserKerb() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 5511657..ce1083d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -205,7 +206,7 @@ import com.google.common.net.InetAddresses;
  ********************************************************/
 @InterfaceAudience.Private
 public class DFSClient implements java.io.Closeable, RemotePeerFactory,
-    DataEncryptionKeyFactory {
+    DataEncryptionKeyFactory, KeyProviderTokenIssuer {
   public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
 
   private final Configuration conf;
@@ -684,6 +685,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return (dtService != null) ? dtService.toString() : null;
   }
 
+  @Override
+  public Token<?>getDelegationToken(String renewer) throws IOException {
+    return getDelegationToken(renewer == null ? null : new Text(renewer));
+  }
+
   /**
    * @see ClientProtocol#getDelegationToken(Text)
    */
@@ -3029,7 +3035,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return HEDGED_READ_METRIC;
   }
 
-  URI getKeyProviderUri() throws IOException {
+  @Override
+  public URI getKeyProviderUri() throws IOException {
     return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri,
         getServerDefaults().getKeyProviderUri(), conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 3532a71..12bc73c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -102,8 +102,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.util.Progressable;
 
@@ -2818,11 +2818,13 @@ public class DistributedFileSystem extends FileSystem
   }
 
   @Override
-  public Token<?>[] addDelegationTokens(
-      final String renewer, Credentials credentials) throws IOException {
-    Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
-    return HdfsKMSUtil.addDelegationTokensForKeyProvider(
-        this, renewer, credentials, uri, tokens);
+  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+      throws IOException {
+    KeyProvider keyProvider = getKeyProvider();
+    if (keyProvider instanceof DelegationTokenIssuer) {
+      return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider};
+    }
+    return null;
   }
 
   public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org