You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2018/10/12 16:35:54 UTC
[2/2] hadoop git commit: HADOOP-14445. Use DelegationTokenIssuer to
create KMS delegation tokens that can authenticate to all KMS instances.
Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah.
HADOOP-14445. Use DelegationTokenIssuer to create KMS delegation tokens that can authenticate to all KMS instances.
Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ec86b44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ec86b44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ec86b44
Branch: refs/heads/trunk
Commit: 5ec86b445cc492f52c33639efb6a09a0d2f27475
Parents: 6e0e6da
Author: Xiao Chen <xi...@apache.org>
Authored: Fri Oct 12 09:32:21 2018 -0700
Committer: Xiao Chen <xi...@apache.org>
Committed: Fri Oct 12 09:35:52 2018 -0700
----------------------------------------------------------------------
.../KeyProviderDelegationTokenExtension.java | 71 ++--
.../crypto/key/KeyProviderTokenIssuer.java | 4 +-
.../crypto/key/kms/KMSClientProvider.java | 220 ++++++++----
.../key/kms/LoadBalancingKMSClientProvider.java | 75 +++-
.../java/org/apache/hadoop/fs/FileSystem.java | 75 +---
.../web/DelegationTokenAuthenticatedURL.java | 25 +-
.../security/token/DelegationTokenIssuer.java | 112 ++++++
.../java/org/apache/hadoop/util/KMSUtil.java | 13 +-
...TestKeyProviderDelegationTokenExtension.java | 20 +-
.../crypto/key/kms/TestKMSClientProvider.java | 138 ++++++++
.../kms/TestLoadBalancingKMSClientProvider.java | 63 +++-
.../apache/hadoop/fs/TestFilterFileSystem.java | 3 +
.../org/apache/hadoop/fs/TestHarFileSystem.java | 3 +
.../hadoop/crypto/key/kms/server/TestKMS.java | 349 ++++++++++++++++---
.../java/org/apache/hadoop/hdfs/DFSClient.java | 11 +-
.../hadoop/hdfs/DistributedFileSystem.java | 14 +-
.../org/apache/hadoop/hdfs/HdfsKMSUtil.java | 60 ++--
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 20 +-
18 files changed, 963 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
index a63b7d5..29c5bcd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.crypto.key;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
@@ -28,7 +32,8 @@ import java.io.IOException;
*/
public class KeyProviderDelegationTokenExtension extends
KeyProviderExtension
- <KeyProviderDelegationTokenExtension.DelegationTokenExtension> {
+ <KeyProviderDelegationTokenExtension.DelegationTokenExtension>
+ implements DelegationTokenIssuer {
private static DelegationTokenExtension DEFAULT_EXTENSION =
new DefaultDelegationTokenExtension();
@@ -36,22 +41,9 @@ public class KeyProviderDelegationTokenExtension extends
/**
* DelegationTokenExtension is a type of Extension that exposes methods
* needed to work with Delegation Tokens.
- */
- public interface DelegationTokenExtension extends
- KeyProviderExtension.Extension {
-
- /**
- * The implementer of this class will take a renewer and add all
- * delegation tokens associated with the renewer to the
- * <code>Credentials</code> object if it is not already present,
- * @param renewer the user allowed to renew the delegation tokens
- * @param credentials cache in which to add new delegation tokens
- * @return list of new delegation tokens
- * @throws IOException thrown if IOException if an IO error occurs.
- */
- Token<?>[] addDelegationTokens(final String renewer,
- Credentials credentials) throws IOException;
-
+ */
+ public interface DelegationTokenExtension
+ extends KeyProviderExtension.Extension, DelegationTokenIssuer {
/**
* Renews the given token.
* @param token The token to be renewed.
@@ -66,6 +58,12 @@ public class KeyProviderDelegationTokenExtension extends
* @throws IOException
*/
Void cancelDelegationToken(final Token<?> token) throws IOException;
+
+ // Do NOT call this. Only intended for internal use.
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ Token<?> selectDelegationToken(Credentials creds);
}
/**
@@ -82,6 +80,16 @@ public class KeyProviderDelegationTokenExtension extends
}
@Override
+ public String getCanonicalServiceName() {
+ return null;
+ }
+
+ @Override
+ public Token<?> getDelegationToken(String renewer) {
+ return null;
+ }
+
+ @Override
public long renewDelegationToken(final Token<?> token) throws IOException {
return 0;
}
@@ -90,26 +98,29 @@ public class KeyProviderDelegationTokenExtension extends
public Void cancelDelegationToken(final Token<?> token) throws IOException {
return null;
}
+
+ @Override
+ public Token<?> selectDelegationToken(Credentials creds) {
+ return null;
+ }
+
}
private KeyProviderDelegationTokenExtension(KeyProvider keyProvider,
DelegationTokenExtension extensions) {
super(keyProvider, extensions);
}
-
- /**
- * Passes the renewer and Credentials object to the underlying
- * {@link DelegationTokenExtension}
- * @param renewer the user allowed to renew the delegation tokens
- * @param credentials cache in which to add new delegation tokens
- * @return list of new delegation tokens
- * @throws IOException thrown if IOException if an IO error occurs.
- */
- public Token<?>[] addDelegationTokens(final String renewer,
- Credentials credentials) throws IOException {
- return getExtension().addDelegationTokens(renewer, credentials);
+
+ @Override
+ public String getCanonicalServiceName() {
+ return getExtension().getCanonicalServiceName();
}
-
+
+ @Override
+ public Token<?> getDelegationToken(final String renewer) throws IOException {
+ return getExtension().getDelegationToken(renewer);
+ }
+
/**
* Creates a <code>KeyProviderDelegationTokenExtension</code> using a given
* {@link KeyProvider}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
index aa5de2c..81caff4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java
@@ -22,15 +22,17 @@ import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
/**
* File systems that support Encryption Zones have to implement this interface.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public interface KeyProviderTokenIssuer {
+public interface KeyProviderTokenIssuer extends DelegationTokenIssuer {
KeyProvider getKeyProvider() throws IOException;
URI getKeyProviderUri() throws IOException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 26b528c..1718a1f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -32,14 +32,13 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.hadoop.util.JsonSerialization;
@@ -58,7 +57,6 @@ import java.io.Writer;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.net.URI;
@@ -99,7 +97,7 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
- private static final Logger LOG =
+ static final Logger LOG =
LoggerFactory.getLogger(KMSClientProvider.class);
private static final String INVALID_SIGNATURE = "Invalid signature";
@@ -133,12 +131,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
+ private KeyProviderDelegationTokenExtension.DelegationTokenExtension
+ clientTokenProvider = this;
+ // the token's service.
private final Text dtService;
-
- // Allow fallback to default kms server port 9600 for certain tests that do
- // not specify the port explicitly in the kms provider url.
- @VisibleForTesting
- public static volatile boolean fallbackDefaultPortForTesting = false;
+ // alias in the credentials.
+ private final Text canonicalService;
private class EncryptedQueueRefiller implements
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@@ -162,6 +160,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
}
+ static class TokenSelector extends AbstractDelegationTokenSelector {
+ static final TokenSelector INSTANCE = new TokenSelector();
+
+ TokenSelector() {
+ super(TOKEN_KIND);
+ }
+ }
+
/**
* The KMS implementation of {@link TokenRenewer}.
*/
@@ -182,8 +188,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override
public long renew(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Renewing delegation token {}", token);
- KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
- KeyProviderFactory.KEY_PROVIDER_PATH);
+ KeyProvider keyProvider = createKeyProvider(token, conf);
try {
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
@@ -204,8 +209,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Canceling delegation token {}", token);
- KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
- KeyProviderFactory.KEY_PROVIDER_PATH);
+ KeyProvider keyProvider = createKeyProvider(token, conf);
try {
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
@@ -222,6 +226,19 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
}
}
+
+ private static KeyProvider createKeyProvider(
+ Token<?> token, Configuration conf) throws IOException {
+ String service = token.getService().toString();
+ URI uri;
+ if (service != null && service.startsWith(SCHEME_NAME + ":/")) {
+ LOG.debug("Creating key provider with token service value {}", service);
+ uri = URI.create(service);
+ } else { // conf fallback
+ uri = KMSUtil.getKeyProviderUri(conf);
+ }
+ return (uri != null) ? KMSUtil.createKeyProviderFromUri(conf, uri) : null;
+ }
}
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
@@ -283,12 +300,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
hostsPart = t[0];
}
- return createProvider(conf, origUrl, port, hostsPart);
+ KMSClientProvider[] providers =
+ createProviders(conf, origUrl, port, hostsPart);
+ return new LoadBalancingKMSClientProvider(providerUri, providers, conf);
}
return null;
}
- private KeyProvider createProvider(Configuration conf,
+ private KMSClientProvider[] createProviders(Configuration conf,
URL origUrl, int port, String hostsPart) throws IOException {
String[] hosts = hostsPart.split(";");
KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
@@ -302,7 +321,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
throw new IOException("Could not instantiate KMSProvider.", e);
}
}
- return new LoadBalancingKMSClientProvider(providers, conf);
+ return providers;
}
}
@@ -358,13 +377,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
super(conf);
kmsUrl = createServiceURL(extractKMSPath(uri));
- int kmsPort = kmsUrl.getPort();
- if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
- kmsPort = 9600;
- }
-
- InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
- dtService = SecurityUtil.buildTokenService(addr);
+ // the token's service so it can be instantiated for renew/cancel.
+ dtService = getDtService(uri);
+ // the canonical service is the alias for the token in the credentials.
+ // typically it's the actual service in the token but older clients expect
+ // an address.
+ URI serviceUri = URI.create(kmsUrl.toString());
+ canonicalService = SecurityUtil.buildTokenService(serviceUri);
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
@@ -402,8 +421,22 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
new EncryptedQueueRefiller());
authToken = new DelegationTokenAuthenticatedURL.Token();
- LOG.debug("KMSClientProvider for KMS url: {} delegation token service: {}" +
- " created.", kmsUrl, dtService);
+ LOG.debug("KMSClientProvider created for KMS url: {} delegation token "
+ + "service: {} canonical service: {}.", kmsUrl, dtService,
+ canonicalService);
+ }
+
+ protected static Text getDtService(URI uri) {
+ Text service;
+ // remove fragment for forward compatibility with logical naming.
+ final String fragment = uri.getFragment();
+ if (fragment != null) {
+ service = new Text(
+ uri.getScheme() + ":" + uri.getSchemeSpecificPart());
+ } else {
+ service = new Text(uri.toString());
+ }
+ return service;
}
private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException {
@@ -475,7 +508,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override
public HttpURLConnection run() throws Exception {
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(configurator);
+ createAuthenticatedURL();
return authUrl.openConnection(url, authToken, doAsUser);
}
});
@@ -931,6 +964,96 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return encKeyVersionQueue.getSize(keyName);
}
+ // note: this is only a crutch for backwards compatibility.
+ // override the instance that will be used to select a token, intended
+ // to allow load balancing provider to find a token issued by any of its
+ // sub-providers.
+ protected void setClientTokenProvider(
+ KeyProviderDelegationTokenExtension.DelegationTokenExtension provider) {
+ clientTokenProvider = provider;
+ }
+
+ @VisibleForTesting
+ DelegationTokenAuthenticatedURL createAuthenticatedURL() {
+ return new DelegationTokenAuthenticatedURL(configurator) {
+ @Override
+ public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+ selectDelegationToken(URL url, Credentials creds) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking for delegation token. creds: {}",
+ creds.getAllTokens());
+ }
+ // clientTokenProvider is either "this" or a load balancing instance.
+ // if the latter, it will first look for the load balancer's uri
+ // service followed by each sub-provider for backwards-compatibility.
+ return clientTokenProvider.selectDelegationToken(creds);
+ }
+ };
+ }
+
+ @InterfaceAudience.Private
+ @Override
+ public Token<?> selectDelegationToken(Credentials creds) {
+ Token<?> token = selectDelegationToken(creds, dtService);
+ if (token == null) {
+ token = selectDelegationToken(creds, canonicalService);
+ }
+ return token;
+ }
+
+ protected static Token<?> selectDelegationToken(Credentials creds,
+ Text service) {
+ Token<?> token = creds.getToken(service);
+ LOG.debug("selected by alias={} token={}", service, token);
+ if (token != null && TOKEN_KIND.equals(token.getKind())) {
+ return token;
+ }
+ token = TokenSelector.INSTANCE.selectToken(service, creds.getAllTokens());
+ LOG.debug("selected by service={} token={}", service, token);
+ return token;
+ }
+
+ @Override
+ public String getCanonicalServiceName() {
+ return canonicalService.toString();
+ }
+
+ @Override
+ public Token<?> getDelegationToken(final String renewer) throws IOException {
+ final URL url = createURL(null, null, null, null);
+ final DelegationTokenAuthenticatedURL authUrl =
+ new DelegationTokenAuthenticatedURL(configurator);
+ Token<?> token = null;
+ try {
+ final String doAsUser = getDoAsUser();
+ token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
+ @Override
+ public Token<?> run() throws Exception {
+ // Not using the cached token here.. Creating a new token here
+ // everytime.
+ LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
+ return authUrl.getDelegationToken(url,
+ new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
+ }
+ });
+ if (token != null) {
+ token.setService(dtService);
+ LOG.info("New token created: ({})", token);
+ } else {
+ throw new IOException("Got NULL as delegation token");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new IOException(e);
+ }
+ }
+ return token;
+ }
+
@Override
public long renewDelegationToken(final Token<?> dToken) throws IOException {
try {
@@ -941,7 +1064,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
token, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(configurator);
+ createAuthenticatedURL();
return getActualUgi().doAs(
new PrivilegedExceptionAction<Long>() {
@Override
@@ -973,7 +1096,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
dToken, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(configurator);
+ createAuthenticatedURL();
authUrl.cancelDelegationToken(url, token, doAsUser);
return null;
}
@@ -1025,47 +1148,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return token;
}
- @Override
- public Token<?>[] addDelegationTokens(final String renewer,
- Credentials credentials) throws IOException {
- Token<?>[] tokens = null;
- Token<?> token = credentials.getToken(dtService);
- if (token == null) {
- final URL url = createURL(null, null, null, null);
- final DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(configurator);
- try {
- final String doAsUser = getDoAsUser();
- token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
- @Override
- public Token<?> run() throws Exception {
- // Not using the cached token here.. Creating a new token here
- // everytime.
- LOG.info("Getting new token from {}, renewer:{}", url, renewer);
- return authUrl.getDelegationToken(url,
- new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
- }
- });
- if (token != null) {
- LOG.info("New token received: ({})", token);
- credentials.addToken(token.getService(), token);
- tokens = new Token<?>[] { token };
- } else {
- throw new IOException("Got NULL as delegation token");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- if (e instanceof IOException) {
- throw (IOException) e;
- } else {
- throw new IOException(e);
- }
- }
- }
- return tokens;
- }
-
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
// Add existing credentials from the UGI, since provider is cached.
Credentials creds = ugi.getCredentials();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
index e68e844..6cb2cdc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.crypto.key.kms;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
+import java.net.URI;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
@@ -36,12 +37,15 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,19 +80,42 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
private final KMSClientProvider[] providers;
private final AtomicInteger currentIdx;
+ private final Text dtService; // service in token.
+ private final Text canonicalService; // credentials alias for token.
private RetryPolicy retryPolicy = null;
- public LoadBalancingKMSClientProvider(KMSClientProvider[] providers,
- Configuration conf) {
- this(shuffle(providers), Time.monotonicNow(), conf);
+ public LoadBalancingKMSClientProvider(URI providerUri,
+ KMSClientProvider[] providers, Configuration conf) {
+ this(providerUri, providers, Time.monotonicNow(), conf);
}
@VisibleForTesting
LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed,
Configuration conf) {
+ this(URI.create("kms://testing"), providers, seed, conf);
+ }
+
+ private LoadBalancingKMSClientProvider(URI uri,
+ KMSClientProvider[] providers, long seed, Configuration conf) {
super(conf);
- this.providers = providers;
+ // uri is the token service so it can be instantiated for renew/cancel.
+ dtService = KMSClientProvider.getDtService(uri);
+ // if provider not in conf, new client will alias on uri else addr.
+ if (KMSUtil.getKeyProviderUri(conf) == null) {
+ canonicalService = dtService;
+ } else {
+ // canonical service (credentials alias) will be the first underlying
+ // provider's service. must be deterministic before shuffle so multiple
+ // calls for a token do not obtain another unnecessary token.
+ canonicalService = new Text(providers[0].getCanonicalServiceName());
+ }
+
+ // shuffle unless seed is 0 which is used by tests for determinism.
+ this.providers = (seed != 0) ? shuffle(providers) : providers;
+ for (KMSClientProvider provider : providers) {
+ provider.setClientTokenProvider(this);
+ }
this.currentIdx = new AtomicInteger((int)(seed % providers.length));
int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length);
@@ -106,6 +133,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
this.retryPolicy = RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis,
sleepMaxMillis);
+ LOG.debug("Created LoadBalancingKMSClientProvider for KMS url: {} with {} "
+ + "providers. delegation token service: {}, canonical service: {}",
+ uri, providers.length, dtService, canonicalService);
}
@VisibleForTesting
@@ -113,6 +143,23 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
return providers;
}
+ @Override
+ public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+ selectDelegationToken(Credentials creds) {
+ Token<? extends TokenIdentifier> token =
+ KMSClientProvider.selectDelegationToken(creds, canonicalService);
+ // fallback to querying each sub-provider.
+ if (token == null) {
+ for (KMSClientProvider provider : getProviders()) {
+ token = provider.selectDelegationToken(creds);
+ if (token != null) {
+ break;
+ }
+ }
+ }
+ return token;
+ }
+
private <T> T doOp(ProviderCallable<T> op, int currPos,
boolean isIdempotent) throws IOException {
if (providers.length == 0) {
@@ -193,13 +240,21 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
}
@Override
- public Token<?>[]
- addDelegationTokens(final String renewer, final Credentials credentials)
- throws IOException {
- return doOp(new ProviderCallable<Token<?>[]>() {
+ public String getCanonicalServiceName() {
+ return canonicalService.toString();
+ }
+
+ @Override
+ public Token<?> getDelegationToken(String renewer) throws IOException {
+ return doOp(new ProviderCallable<Token<?>>() {
@Override
- public Token<?>[] call(KMSClientProvider provider) throws IOException {
- return provider.addDelegationTokens(renewer, credentials);
+ public Token<?> call(KMSClientProvider provider) throws IOException {
+ Token<?> token = provider.getDelegationToken(renewer);
+ // override sub-providers service with our own so it can be used
+ // across all providers.
+ token.setService(dtService);
+ LOG.debug("New token service set. Token: ({})", token);
+ return token;
}
}, nextIdx(), false);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 66b6d44..3d40b6a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -58,13 +58,13 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
@@ -121,7 +121,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
@SuppressWarnings("DeprecatedIsStillUsed")
@InterfaceAudience.Public
@InterfaceStability.Stable
-public abstract class FileSystem extends Configured implements Closeable {
+public abstract class FileSystem extends Configured
+ implements Closeable, DelegationTokenIssuer {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
@@ -386,6 +387,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
+ @Override
public String getCanonicalServiceName() {
return (getChildFileSystems() == null)
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
@@ -600,72 +602,12 @@ public abstract class FileSystem extends Configured implements Closeable {
* @throws IOException on any problem obtaining a token
*/
@InterfaceAudience.Private()
+ @Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return null;
}
/**
- * Obtain all delegation tokens used by this FileSystem that are not
- * already present in the given Credentials. Existing tokens will neither
- * be verified as valid nor having the given renewer. Missing tokens will
- * be acquired and added to the given Credentials.
- *
- * Default Impl: works for simple FS with its own token
- * and also for an embedded FS whose tokens are those of its
- * child FileSystems (i.e. the embedded FS has no tokens of its own).
- *
- * @param renewer the user allowed to renew the delegation tokens
- * @param credentials cache in which to add new delegation tokens
- * @return list of new delegation tokens
- * @throws IOException problems obtaining a token
- */
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- public Token<?>[] addDelegationTokens(
- final String renewer, Credentials credentials) throws IOException {
- if (credentials == null) {
- credentials = new Credentials();
- }
- final List<Token<?>> tokens = new ArrayList<>();
- collectDelegationTokens(renewer, credentials, tokens);
- return tokens.toArray(new Token<?>[tokens.size()]);
- }
-
- /**
- * Recursively obtain the tokens for this FileSystem and all descendant
- * FileSystems as determined by {@link #getChildFileSystems()}.
- * @param renewer the user allowed to renew the delegation tokens
- * @param credentials cache in which to add the new delegation tokens
- * @param tokens list in which to add acquired tokens
- * @throws IOException problems obtaining a token
- */
- private void collectDelegationTokens(final String renewer,
- final Credentials credentials,
- final List<Token<?>> tokens)
- throws IOException {
- final String serviceName = getCanonicalServiceName();
- // Collect token of the this filesystem and then of its embedded children
- if (serviceName != null) { // fs has token, grab it
- final Text service = new Text(serviceName);
- Token<?> token = credentials.getToken(service);
- if (token == null) {
- token = getDelegationToken(renewer);
- if (token != null) {
- tokens.add(token);
- credentials.addToken(service, token);
- }
- }
- }
- // Now collect the tokens from the children
- final FileSystem[] children = getChildFileSystems();
- if (children != null) {
- for (final FileSystem fs : children) {
- fs.collectDelegationTokens(renewer, credentials, tokens);
- }
- }
- }
-
- /**
* Get all the immediate child FileSystems embedded in this FileSystem.
* It does not recurse and get grand children. If a FileSystem
* has multiple child FileSystems, then it must return a unique list
@@ -680,6 +622,13 @@ public abstract class FileSystem extends Configured implements Closeable {
return null;
}
+ @InterfaceAudience.Private
+ @Override
+ public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+ throws IOException {
+ return getChildFileSystems();
+ }
+
/**
* Create a file with the provided permission.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
index 35589a2..4e9881b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
@@ -296,15 +296,11 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
Credentials creds = UserGroupInformation.getCurrentUser().
getCredentials();
if (LOG.isDebugEnabled()) {
- LOG.debug("Token not set, looking for delegation token. Creds:{}",
- creds.getAllTokens());
+ LOG.debug("Token not set, looking for delegation token. Creds:{},"
+ + " size:{}", creds.getAllTokens(), creds.numberOfTokens());
}
if (!creds.getAllTokens().isEmpty()) {
- InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
- url.getPort());
- Text service = SecurityUtil.buildTokenService(serviceAddr);
- dToken = creds.getToken(service);
- LOG.debug("Using delegation token {} from service:{}", dToken, service);
+ dToken = selectDelegationToken(url, creds);
if (dToken != null) {
if (useQueryStringForDelegationToken()) {
// delegation token will go in the query string, injecting it
@@ -341,6 +337,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
}
/**
+ * Select a delegation token from all tokens in credentials, based on url.
+ */
+ @InterfaceAudience.Private
+ public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+ selectDelegationToken(URL url, Credentials creds) {
+ final InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
+ url.getPort());
+ final Text service = SecurityUtil.buildTokenService(serviceAddr);
+ org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
+ creds.getToken(service);
+ LOG.debug("Using delegation token {} from service:{}", dToken, service);
+ return dToken;
+ }
+
+ /**
* Requests a delegation token using the configured <code>Authenticator</code>
* for authentication.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java
new file mode 100644
index 0000000..90e72b9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.org.apache.hadoop.security.token;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class for issuing delegation tokens.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "Yarn"})
+@InterfaceStability.Unstable
+public interface DelegationTokenIssuer {
+
+ /**
+ * The service name used as the alias for the token in the credential
+ * token map. addDelegationTokens will use this to determine if
+ * a token exists, and if not, add a new token with this alias.
+ */
+ String getCanonicalServiceName();
+
+ /**
+ * Unconditionally get a new token with the optional renewer. Returning
+ * null indicates the service does not issue tokens.
+ */
+ Token<?> getDelegationToken(String renewer) throws IOException;
+
+ /**
+ * Issuers may need tokens from additional services.
+ */
+ default DelegationTokenIssuer[] getAdditionalTokenIssuers()
+ throws IOException {
+ return null;
+ }
+
+ /**
+ * Given a renewer, add delegation tokens for issuer and it's child issuers
+ * to the <code>Credentials</code> object if it is not already present.
+ *<p>
+ * Note: This method is not intended to be overridden. Issuers should
+ * implement getCanonicalService and getDelegationToken to ensure
+ * consistent token acquisition behavior.
+ *
+ * @param renewer the user allowed to renew the delegation tokens
+ * @param credentials cache in which to add new delegation tokens
+ * @return list of new delegation tokens
+ * @throws IOException thrown if IOException if an IO error occurs.
+ */
+ default Token<?>[] addDelegationTokens(
+ final String renewer, Credentials credentials) throws IOException {
+ if (credentials == null) {
+ credentials = new Credentials();
+ }
+ final List<Token<?>> tokens = new ArrayList<>();
+ collectDelegationTokens(this, renewer, credentials, tokens);
+ return tokens.toArray(new Token<?>[tokens.size()]);
+ }
+
+ /**
+ * NEVER call this method directly.
+ */
+ @InterfaceAudience.Private
+ static void collectDelegationTokens(
+ final DelegationTokenIssuer issuer,
+ final String renewer,
+ final Credentials credentials,
+ final List<Token<?>> tokens) throws IOException {
+ final String serviceName = issuer.getCanonicalServiceName();
+ // Collect token of the this issuer and then of its embedded children
+ if (serviceName != null) {
+ final Text service = new Text(serviceName);
+ Token<?> token = credentials.getToken(service);
+ if (token == null) {
+ token = issuer.getDelegationToken(renewer);
+ if (token != null) {
+ tokens.add(token);
+ credentials.addToken(service, token);
+ }
+ }
+ }
+ // Now collect the tokens from the children.
+ final DelegationTokenIssuer[] ancillary =
+ issuer.getAdditionalTokenIssuers();
+ if (ancillary != null) {
+ for (DelegationTokenIssuer subIssuer : ancillary) {
+ collectDelegationTokens(subIssuer, renewer, credentials, tokens);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
index c96c6fb..5b48da1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
@@ -59,12 +59,23 @@ public final class KMSUtil {
public static KeyProvider createKeyProvider(final Configuration conf,
final String configKeyName) throws IOException {
LOG.debug("Creating key provider with config key {}", configKeyName);
+ URI uri = getKeyProviderUri(conf, configKeyName);
+ return (uri != null) ? createKeyProviderFromUri(conf, uri) : null;
+ }
+
+ public static URI getKeyProviderUri(final Configuration conf) {
+ return KMSUtil.getKeyProviderUri(
+ conf, KeyProviderFactory.KEY_PROVIDER_PATH);
+ }
+
+ public static URI getKeyProviderUri(final Configuration conf,
+ final String configKeyName) {
final String providerUriStr = conf.getTrimmed(configKeyName);
// No provider set in conf
if (providerUriStr == null || providerUriStr.isEmpty()) {
return null;
}
- return createKeyProviderFromUri(conf, URI.create(providerUriStr));
+ return URI.create(providerUriStr);
}
public static KeyProvider createKeyProviderFromUri(final Configuration conf,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
index df5d3e8..4fabc5b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java
@@ -51,23 +51,27 @@ public class TestKeyProviderDelegationTokenExtension {
KeyProviderDelegationTokenExtension
.createKeyProviderDelegationTokenExtension(kp);
Assert.assertNotNull(kpDTE1);
- // Default implementation should be a no-op and return null
- Assert.assertNull(kpDTE1.addDelegationTokens("user", credentials));
+ Token<?>[] tokens = kpDTE1.addDelegationTokens("user", credentials);
+ // Default implementation should return no tokens.
+ Assert.assertNotNull(tokens);
+ Assert.assertEquals(0, tokens.length);
MockKeyProvider mock = mock(MockKeyProvider.class);
Mockito.when(mock.getConf()).thenReturn(new Configuration());
- when(mock.addDelegationTokens("renewer", credentials)).thenReturn(
- new Token<?>[]{new Token(null, null, new Text("kind"), new Text(
- "service"))}
+ when(mock.getCanonicalServiceName()).thenReturn("cservice");
+ when(mock.getDelegationToken("renewer")).thenReturn(
+ new Token(null, null, new Text("kind"), new Text(
+ "tservice"))
);
KeyProviderDelegationTokenExtension kpDTE2 =
KeyProviderDelegationTokenExtension
.createKeyProviderDelegationTokenExtension(mock);
- Token<?>[] tokens =
- kpDTE2.addDelegationTokens("renewer", credentials);
+ tokens = kpDTE2.addDelegationTokens("renewer", credentials);
Assert.assertNotNull(tokens);
+ Assert.assertEquals(1, tokens.length);
Assert.assertEquals("kind", tokens[0].getKind().toString());
-
+ Assert.assertEquals("tservice", tokens[0].getService().toString());
+ Assert.assertNotNull(credentials.getToken(new Text("cservice")));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
new file mode 100644
index 0000000..b87f45a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit test for {@link KMSClientProvider} class.
+ */
+public class TestKMSClientProvider {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestKMSClientProvider.class);
+
+ private final Token token = new Token();
+ private final Token oldToken = new Token();
+ private final String urlString = "https://host:16000/kms";
+ private final String providerUriString = "kms://https@host:16000/kms";
+ private final String oldTokenService = "host:16000";
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(60000);
+
+ {
+ GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
+ }
+
+ @Before
+ public void setup() {
+ SecurityUtil.setTokenServiceUseIp(false);
+ token.setKind(TOKEN_KIND);
+ token.setService(new Text(providerUriString));
+ oldToken.setKind(TOKEN_KIND);
+ oldToken.setService(new Text(oldTokenService));
+ }
+
+ @Test
+ public void testSelectDelegationToken() throws Exception {
+ final Credentials creds = new Credentials();
+ creds.addToken(new Text(providerUriString), token);
+ assertNull(KMSClientProvider.selectDelegationToken(creds, null));
+ assertNull(KMSClientProvider
+ .selectDelegationToken(creds, new Text(oldTokenService)));
+ assertEquals(token, KMSClientProvider
+ .selectDelegationToken(creds, new Text(providerUriString)));
+ }
+
+ @Test
+ public void testSelectTokenOldService() throws Exception {
+ final Configuration conf = new Configuration();
+ final URI uri = new URI(providerUriString);
+ final KMSClientProvider kp = new KMSClientProvider(uri, conf);
+ try {
+ final Credentials creds = new Credentials();
+ creds.addToken(new Text(oldTokenService), oldToken);
+ final Token t = kp.selectDelegationToken(creds);
+ assertEquals(oldToken, t);
+ } finally {
+ kp.close();
+ }
+ }
+
+ @Test
+ public void testSelectTokenWhenBothExist() throws Exception {
+ final Credentials creds = new Credentials();
+ final Configuration conf = new Configuration();
+ final URI uri = new URI(providerUriString);
+ final KMSClientProvider kp = new KMSClientProvider(uri, conf);
+ try {
+ creds.addToken(token.getService(), token);
+ creds.addToken(oldToken.getService(), oldToken);
+ final Token t = kp.selectDelegationToken(creds);
+ assertEquals("new token should be selected when both exist", token, t);
+ } finally {
+ kp.close();
+ }
+ }
+
+ @Test
+ public void testURLSelectTokenUriFormat() throws Exception {
+ testURLSelectToken(token);
+ }
+
+ @Test
+ public void testURLSelectTokenIpPort() throws Exception {
+ testURLSelectToken(oldToken);
+ }
+
+ private void testURLSelectToken(final Token tok)
+ throws URISyntaxException, IOException {
+ final Configuration conf = new Configuration();
+ final URI uri = new URI(providerUriString);
+ final KMSClientProvider kp = new KMSClientProvider(uri, conf);
+ final DelegationTokenAuthenticatedURL url = kp.createAuthenticatedURL();
+ final Credentials creds = new Credentials();
+ creds.addToken(tok.getService(), tok);
+ final Token chosen = url.selectDelegationToken(new URL(urlString), creds);
+ assertEquals(tok, chosen);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
index 058db92..c99c63d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key.kms;
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -49,7 +50,6 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -68,33 +68,27 @@ public class TestLoadBalancingKMSClientProvider {
SecurityUtil.setTokenServiceUseIp(false);
}
- @After
- public void teardown() throws IOException {
- KMSClientProvider.fallbackDefaultPortForTesting = false;
- }
-
@Test
public void testCreation() throws Exception {
Configuration conf = new Configuration();
- KMSClientProvider.fallbackDefaultPortForTesting = true;
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
- "kms://http@host1/kms/foo"), conf);
+ "kms://http@host1:9600/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
KMSClientProvider[] providers =
((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(1, providers.length);
- assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
+ assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl()));
kp = new KMSClientProvider.Factory().createProvider(new URI(
- "kms://http@host1;host2;host3/kms/foo"), conf);
+ "kms://http@host1;host2;host3:9600/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
providers =
((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(3, providers.length);
- assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
- "http://host2/kms/foo/v1/",
- "http://host3/kms/foo/v1/"),
+ assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
+ "http://host2:9600/kms/foo/v1/",
+ "http://host3:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl(),
providers[1].getKMSUrl(),
providers[2].getKMSUrl()));
@@ -257,10 +251,9 @@ public class TestLoadBalancingKMSClientProvider {
@Test
public void testClassCastException() throws Exception {
Configuration conf = new Configuration();
- KMSClientProvider.fallbackDefaultPortForTesting = true;
KMSClientProvider p1 = new MyKMSClientProvider(
- new URI("kms://http@host1/kms/foo"), conf);
- LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+ new URI("kms://http@host1:9600/kms/foo"), conf);
+ LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
new KMSClientProvider[] {p1}, 0, conf);
try {
kp.generateEncryptedKey("foo");
@@ -878,4 +871,42 @@ public class TestLoadBalancingKMSClientProvider {
verify(kp.getProviders()[2], Mockito.times(1))
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
}
+
+ @Test
+ public void testTokenServiceCreationWithLegacyFormat() throws Exception {
+ Configuration conf = new Configuration();
+ // Create keyprovider with old token format (ip:port)
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+ "kms:/something");
+ String authority = "host1:9600";
+ URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo");
+ KeyProvider kp =
+ new KMSClientProvider.Factory().createProvider(kmsUri, conf);
+ assertTrue(kp instanceof LoadBalancingKMSClientProvider);
+ LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp;
+ assertEquals(1, lbkp.getProviders().length);
+ assertEquals(authority, lbkp.getCanonicalServiceName());
+ for (KMSClientProvider provider : lbkp.getProviders()) {
+ assertEquals(authority, provider.getCanonicalServiceName());
+ }
+ }
+
+ @Test
+ public void testTokenServiceCreationWithUriFormat() throws Exception {
+ final Configuration conf = new Configuration();
+ final URI kmsUri = URI.create("kms://http@host1;host2;host3:9600/kms/foo");
+ final KeyProvider kp =
+ new KMSClientProvider.Factory().createProvider(kmsUri, conf);
+ assertTrue(kp instanceof LoadBalancingKMSClientProvider);
+ final LoadBalancingKMSClientProvider lbkp =
+ (LoadBalancingKMSClientProvider) kp;
+ assertEquals(kmsUri.toString(), lbkp.getCanonicalServiceName());
+ KMSClientProvider[] providers = lbkp.getProviders();
+ assertEquals(3, providers.length);
+ for (int i = 0; i < providers.length; i++) {
+ assertEquals(URI.create(providers[i].getKMSUrl()).getAuthority(),
+ providers[i].getCanonicalServiceName());
+ assertNotEquals(kmsUri, providers[i].getCanonicalServiceName());
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index 0e9a612..a766cfb4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.Progressable;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -125,6 +126,8 @@ public class TestFilterFileSystem {
public int getDefaultPort();
public String getCanonicalServiceName();
public Token<?> getDelegationToken(String renewer) throws IOException;
+ public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+ throws IOException;
public boolean deleteOnExit(Path f) throws IOException;
public boolean cancelDeleteOnExit(Path f) throws IOException;
public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 1b69693..870a828 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.Progressable;
import org.junit.Assert;
import org.junit.Test;
@@ -145,6 +146,8 @@ public class TestHarFileSystem {
public int getDefaultPort();
public String getCanonicalServiceName();
public Token<?> getDelegationToken(String renewer) throws IOException;
+ public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+ throws IOException;
public FileChecksum getFileChecksum(Path f) throws IOException;
public boolean deleteOnExit(Path f) throws IOException;
public boolean cancelDeleteOnExit(Path f) throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index d709ba8..af59877 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
@@ -44,6 +45,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
@@ -96,6 +98,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -141,21 +145,78 @@ public class TestKMS {
}
public static abstract class KMSCallable<T> implements Callable<T> {
- private URL kmsUrl;
+ private List<URL> kmsUrl;
protected URL getKMSUrl() {
- return kmsUrl;
+ return kmsUrl.get(0);
+ }
+
+ protected URL[] getKMSHAUrl() {
+ URL[] urls = new URL[kmsUrl.size()];
+ return kmsUrl.toArray(urls);
+ }
+
+ protected void addKMSUrl(URL url) {
+ if (kmsUrl == null) {
+ kmsUrl = new ArrayList<URL>();
+ }
+ kmsUrl.add(url);
+ }
+
+ /*
+ * The format of the returned value will be
+ * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
+ */
+ protected String generateLoadBalancingKeyProviderUriString() {
+ if (kmsUrl == null || kmsUrl.size() == 0) {
+ return null;
+ }
+ StringBuffer sb = new StringBuffer();
+
+ for (int i = 0; i < kmsUrl.size(); i++) {
+ sb.append(KMSClientProvider.SCHEME_NAME + "://" +
+ kmsUrl.get(0).getProtocol() + "@");
+ URL url = kmsUrl.get(i);
+ sb.append(url.getAuthority());
+ if (url.getPath() != null) {
+ sb.append(url.getPath());
+ }
+ if (i < kmsUrl.size() - 1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
}
}
protected KeyProvider createProvider(URI uri, Configuration conf)
throws IOException {
- final KeyProvider ret = new LoadBalancingKMSClientProvider(
+ final KeyProvider ret = new LoadBalancingKMSClientProvider(uri,
new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
providersCreated.add(ret);
return ret;
}
+ /**
+ * create a LoadBalancingKMSClientProvider from an array of URIs.
+ * @param uris an array of KMS URIs
+ * @param conf configuration object
+ * @return a LoadBalancingKMSClientProvider object
+ * @throws IOException
+ */
+ protected LoadBalancingKMSClientProvider createHAProvider(URI lbUri,
+ URI[] uris, Configuration conf) throws IOException {
+ KMSClientProvider[] providers = new KMSClientProvider[uris.length];
+ for (int i = 0; i < providers.length; i++) {
+ providers[i] =
+ new KMSClientProvider(uris[i], conf);
+ }
+ final LoadBalancingKMSClientProvider ret =
+ new LoadBalancingKMSClientProvider(lbUri, providers, conf);
+ providersCreated.add(ret);
+ return ret;
+ }
+
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
throws IOException {
final KMSClientProvider ret = new KMSClientProvider(uri, conf);
@@ -170,22 +231,34 @@ public class TestKMS {
protected <T> T runServer(int port, String keystore, String password, File confDir,
KMSCallable<T> callable) throws Exception {
+ return runServer(new int[] {port}, keystore, password, confDir, callable);
+ }
+
+ protected <T> T runServer(int[] ports, String keystore, String password,
+ File confDir, KMSCallable<T> callable) throws Exception {
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
.setLog4jConfFile("log4j.properties");
if (keystore != null) {
miniKMSBuilder.setSslConf(new File(keystore), password);
}
- if (port > 0) {
- miniKMSBuilder.setPort(port);
+ final List<MiniKMS> kmsList = new ArrayList<>();
+
+ for (int i = 0; i < ports.length; i++) {
+ if (ports[i] > 0) {
+ miniKMSBuilder.setPort(ports[i]);
+ }
+ MiniKMS miniKMS = miniKMSBuilder.build();
+ kmsList.add(miniKMS);
+ miniKMS.start();
+ LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
+ callable.addKMSUrl(miniKMS.getKMSUrl());
}
- MiniKMS miniKMS = miniKMSBuilder.build();
- miniKMS.start();
try {
- System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
- callable.kmsUrl = miniKMS.getKMSUrl();
return callable.call();
} finally {
- miniKMS.stop();
+ for (MiniKMS miniKMS: kmsList) {
+ miniKMS.stop();
+ }
}
}
@@ -240,6 +313,13 @@ public class TestKMS {
return new URI("kms://" + str);
}
+ public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
+ URI[] uris = new URI[kmsUrls.length];
+ for (int i=0; i< kmsUrls.length; i++) {
+ uris[i] = createKMSUri(kmsUrls[i]);
+ }
+ return uris;
+ }
private static class KerberosConfiguration
extends javax.security.auth.login.Configuration {
@@ -306,6 +386,7 @@ public class TestKMS {
principals.add("otheradmin");
principals.add("client/host");
principals.add("client1");
+ principals.add("foo");
for (KMSACLs.Type type : KMSACLs.Type.values()) {
principals.add(type.toString());
}
@@ -2011,7 +2092,6 @@ public class TestKMS {
return null;
}
});
-
nonKerberosUgi.addCredentials(credentials);
try {
@@ -2067,6 +2147,18 @@ public class TestKMS {
testDelegationTokensOps(true, true);
}
+ private Text getTokenService(KeyProvider provider) {
+ assertTrue("KeyProvider should be an instance of " +
+ "LoadBalancingKMSClientProvider", (provider instanceof
+ LoadBalancingKMSClientProvider));
+ assertEquals("Num client providers should be 1", 1,
+ ((LoadBalancingKMSClientProvider)provider).getProviders().length);
+ final Text tokenService = new Text(
+ (((LoadBalancingKMSClientProvider)provider).getProviders()[0])
+ .getCanonicalServiceName());
+ return tokenService;
+ }
+
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
throws Exception {
final File confDir = getTestDir();
@@ -2103,6 +2195,10 @@ public class TestKMS {
@Override
public Void run() throws Exception {
KeyProvider kp = createProvider(uri, clientConf);
+ // Unset the conf value for key provider path just to be sure that
+ // the key provider created for renew and cancel token is from
+ // token service field.
+ clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
// test delegation token retrieval
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
@@ -2110,13 +2206,10 @@ public class TestKMS {
final Credentials credentials = new Credentials();
final Token<?>[] tokens =
kpdte.addDelegationTokens("client1", credentials);
+ Text tokenService = getTokenService(kp);
Assert.assertEquals(1, credentials.getAllTokens().size());
- InetSocketAddress kmsAddr =
- new InetSocketAddress(getKMSUrl().getHost(),
- getKMSUrl().getPort());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
- credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
- getKind());
+ credentials.getToken(tokenService).getKind());
// Test non-renewer user cannot renew.
for (Token<?> token : tokens) {
@@ -2258,15 +2351,16 @@ public class TestKMS {
// Get a DT and use it.
final Credentials credentials = new Credentials();
kpdte.addDelegationTokens("client", credentials);
+ Text tokenService = getTokenService(kp);
Assert.assertEquals(1, credentials.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
- getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
+ getToken(tokenService).getKind());
UserGroupInformation.getCurrentUser().addCredentials(credentials);
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
getCurrentUser().getCredentials().getAllTokens());
Token<?> token =
UserGroupInformation.getCurrentUser().getCredentials()
- .getToken(SecurityUtil.buildTokenService(kmsAddr));
+ .getToken(tokenService);
Assert.assertNotNull(token);
job1Token.add(token);
@@ -2302,17 +2396,17 @@ public class TestKMS {
// Get a new DT, but don't use it yet.
final Credentials newCreds = new Credentials();
kpdte.addDelegationTokens("client", newCreds);
+ Text tokenService = getTokenService(kp);
Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
- newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+ newCreds.getToken(tokenService).
getKind());
// Using job 1's DT should fail.
final Credentials oldCreds = new Credentials();
for (Token<?> token : job1Token) {
if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
- oldCreds
- .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
+ oldCreds.addToken(tokenService, token);
}
}
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
@@ -2328,7 +2422,7 @@ public class TestKMS {
// Using the new DT should succeed.
Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
- newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
+ newCreds.getToken(tokenService).
getKind());
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
LOG.info("Credetials now are: {}", UserGroupInformation
@@ -2357,7 +2451,14 @@ public class TestKMS {
doKMSWithZK(true, true);
}
- public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+ private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+ KMSCallable<T> callable) throws Exception {
+ return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
+ }
+
+ private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+ KMSCallable<T> callable, int kmsSize) throws Exception {
+
TestingServer zkServer = null;
try {
zkServer = new TestingServer();
@@ -2403,43 +2504,189 @@ public class TestKMS {
writeConf(testDir, conf);
- KMSCallable<KeyProvider> c =
- new KMSCallable<KeyProvider>() {
- @Override
- public KeyProvider call() throws Exception {
- final Configuration conf = new Configuration();
- conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
- final URI uri = createKMSUri(getKMSUrl());
-
- final KeyProvider kp =
- doAs("SET_KEY_MATERIAL",
- new PrivilegedExceptionAction<KeyProvider>() {
- @Override
- public KeyProvider run() throws Exception {
- KeyProvider kp = createProvider(uri, conf);
- kp.createKey("k1", new byte[16],
- new KeyProvider.Options(conf));
- kp.createKey("k2", new byte[16],
- new KeyProvider.Options(conf));
- kp.createKey("k3", new byte[16],
- new KeyProvider.Options(conf));
- return kp;
- }
- });
- return kp;
- }
- };
-
- runServer(null, null, testDir, c);
+ int[] ports = new int[kmsSize];
+ for (int i = 0; i < ports.length; i++) {
+ ports[i] = -1;
+ }
+ return runServer(ports, null, null, testDir, callable);
} finally {
if (zkServer != null) {
zkServer.stop();
zkServer.close();
}
}
+ }
+ public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+ KMSCallable<KeyProvider> c =
+ new KMSCallable<KeyProvider>() {
+ @Override
+ public KeyProvider call() throws Exception {
+ final Configuration conf = new Configuration();
+ conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+ final URI uri = createKMSUri(getKMSUrl());
+
+ final KeyProvider kp =
+ doAs("SET_KEY_MATERIAL",
+ new PrivilegedExceptionAction<KeyProvider>() {
+ @Override
+ public KeyProvider run() throws Exception {
+ KeyProvider kp = createProvider(uri, conf);
+ kp.createKey("k1", new byte[16],
+ new KeyProvider.Options(conf));
+ kp.createKey("k2", new byte[16],
+ new KeyProvider.Options(conf));
+ kp.createKey("k3", new byte[16],
+ new KeyProvider.Options(conf));
+ return kp;
+ }
+ });
+ return kp;
+ }
+ };
+
+ runServerWithZooKeeper(zkDTSM, zkSigner, c);
+ }
+
+ @Test
+ public void testKMSHAZooKeeperDelegationToken() throws Exception {
+ final int kmsSize = 2;
+ doKMSWithZKWithDelegationToken(true, true, kmsSize);
}
+ private void doKMSWithZKWithDelegationToken(boolean zkDTSM, boolean zkSigner,
+ int kmsSize) throws Exception {
+ // Create a KMSCallable to execute requests after ZooKeeper and KMS are up.
+ KMSCallable<Void> c = new KMSCallable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final Configuration conf = new Configuration();
+ conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+ final URI[] uris = createKMSHAUri(getKMSHAUrl());
+ final Credentials credentials = new Credentials();
+ // Create a UGI without Kerberos auth. It will authenticate with tokens.
+ final UserGroupInformation nonKerberosUgi =
+ UserGroupInformation.getCurrentUser();
+ final String lbUri = generateLoadBalancingKeyProviderUriString();
+ final LoadBalancingKMSClientProvider lbkp =
+ createHAProvider(URI.create(lbUri), uris, conf);
+ conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+ // get delegation tokens using kerberos login
+ doAs("SET_KEY_MATERIAL",
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ KeyProviderDelegationTokenExtension kpdte =
+ KeyProviderDelegationTokenExtension.
+ createKeyProviderDelegationTokenExtension(lbkp);
+ kpdte.addDelegationTokens("foo", credentials);
+ return null;
+ }
+ });
+
+ nonKerberosUgi.addCredentials(credentials);
+ // Access KMS using delegation token for authentication, no Kerberos.
+ nonKerberosUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Create a kms client with one provider at a time. Must use one
+ // provider so that if it fails to authenticate, it does not fall
+ // back to the next KMS instance.
+ // Should succeed because it has delegation tokens for any instance.
+ int i = 0;
+ for (KMSClientProvider provider : lbkp.getProviders()) {
+ final String key = "k" + i++;
+ LOG.info("Connect to {} to create key {}.", provider, key);
+ provider.createKey(key, new KeyProvider.Options(conf));
+ }
+ return null;
+ }
+ });
+
+ final Collection<Token<? extends TokenIdentifier>> tokens =
+ credentials.getAllTokens();
+ doAs("foo", new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ assertEquals(1, tokens.size());
+ Token token = tokens.iterator().next();
+ assertEquals(KMSDelegationToken.TOKEN_KIND, token.getKind());
+ LOG.info("Got dt for token: {}", token);
+ final long tokenLife = token.renew(conf);
+ LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
+ Thread.sleep(10);
+ final long newTokenLife = token.renew(conf);
+ LOG.info("Renewed token {}, new lifetime:{}", token, newTokenLife);
+ assertTrue(newTokenLife > tokenLife);
+
+ // test delegation token cancellation
+ LOG.info("Got dt for token: {}", token);
+ token.cancel(conf);
+ LOG.info("Cancelled token {}", token);
+ try {
+ token.renew(conf);
+ fail("should not be able to renew a canceled token");
+ } catch (Exception e) {
+ LOG.info("Expected exception when renewing token", e);
+ }
+ return null;
+ }
+ });
+
+ final Credentials newCredentials = new Credentials();
+ doAs("SET_KEY_MATERIAL",
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ KeyProviderDelegationTokenExtension kpdte =
+ KeyProviderDelegationTokenExtension.
+ createKeyProviderDelegationTokenExtension(lbkp);
+ kpdte.addDelegationTokens("foo", newCredentials);
+ return null;
+ }
+ });
+
+ doAs("foo", new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ KMSClientProvider kp1 = lbkp.getProviders()[0];
+ URL[] urls = getKMSHAUrl();
+ final Collection<Token<? extends TokenIdentifier>> tokens =
+ newCredentials.getAllTokens();
+ assertEquals(1, tokens.size());
+ Token token = tokens.iterator().next();
+ assertEquals(KMSDelegationToken.TOKEN_KIND,
+ token.getKind());
+ // Testing backward compatibility of token renewal and cancellation.
+ // Set the token service to ip:port format and test to renew/cancel.
+ Text text = SecurityUtil.buildTokenService(
+ new InetSocketAddress(urls[0].getHost(), urls[0].getPort()));
+ token.setService(text);
+ conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
+ long tokenLife = 0L;
+ for (KMSClientProvider kp : lbkp.getProviders()) {
+ long renewedTokenLife = token.renew(conf);
+ LOG.info("Renewed token of kind {}, new lifetime:{}",
+ token.getKind(), renewedTokenLife);
+ assertTrue(renewedTokenLife > tokenLife);
+ tokenLife = renewedTokenLife;
+ Thread.sleep(10);
+ }
+ token.cancel(conf);
+ try {
+ token.renew(conf);
+ fail("should not be able to renew a canceled token");
+ } catch (IOException e) {
+ LOG.info("Expected exception when renewing token", e);
+ }
+ return null;
+ }
+ });
+ return null;
+ }
+ };
+ runServerWithZooKeeper(zkDTSM, zkSigner, c, kmsSize);
+ }
@Test
public void testProxyUserKerb() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 5511657..ce1083d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
@@ -205,7 +206,7 @@ import com.google.common.net.InetAddresses;
********************************************************/
@InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
- DataEncryptionKeyFactory {
+ DataEncryptionKeyFactory, KeyProviderTokenIssuer {
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
private final Configuration conf;
@@ -684,6 +685,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return (dtService != null) ? dtService.toString() : null;
}
+ @Override
+ public Token<?>getDelegationToken(String renewer) throws IOException {
+ return getDelegationToken(renewer == null ? null : new Text(renewer));
+ }
+
/**
* @see ClientProtocol#getDelegationToken(Text)
*/
@@ -3029,7 +3035,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return HEDGED_READ_METRIC;
}
- URI getKeyProviderUri() throws IOException {
+ @Override
+ public URI getKeyProviderUri() throws IOException {
return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri,
getServerDefaults().getKeyProviderUri(), conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec86b44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 3532a71..12bc73c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -102,8 +102,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Progressable;
@@ -2818,11 +2818,13 @@ public class DistributedFileSystem extends FileSystem
}
@Override
- public Token<?>[] addDelegationTokens(
- final String renewer, Credentials credentials) throws IOException {
- Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
- return HdfsKMSUtil.addDelegationTokensForKeyProvider(
- this, renewer, credentials, uri, tokens);
+ public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+ throws IOException {
+ KeyProvider keyProvider = getKeyProvider();
+ if (keyProvider instanceof DelegationTokenIssuer) {
+ return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider};
+ }
+ return null;
}
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org