You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/09/17 22:01:38 UTC
[16/50] [abbrv] hadoop git commit: HADOOP-15660. ABFS: Add support
for OAuth Contributed by Da Zhou, Rajeev Bansal, and Junhua Gu.
HADOOP-15660. ABFS: Add support for OAuth
Contributed by Da Zhou, Rajeev Bansal, and Junhua Gu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9149b970
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9149b970
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9149b970
Branch: refs/heads/HADOOP-15407
Commit: 9149b9703e3ab09abdc087db129e82ad3f4cefa1
Parents: d6a4f39
Author: Thomas Marquardt <tm...@microsoft.com>
Authored: Sat Aug 18 18:53:32 2018 +0000
Committer: Thomas Marquardt <tm...@microsoft.com>
Committed: Mon Sep 17 19:54:01 2018 +0000
----------------------------------------------------------------------
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 149 ++++++--
.../fs/azurebfs/AzureBlobFileSystemStore.java | 26 +-
.../azurebfs/constants/ConfigurationKeys.java | 19 +
.../TokenAccessProviderException.java | 36 ++
.../services/AzureServiceErrorCode.java | 1 +
.../services/ListResultEntrySchema.java | 89 ++++-
.../fs/azurebfs/oauth2/AccessTokenProvider.java | 98 ++++++
.../azurebfs/oauth2/AzureADAuthenticator.java | 344 +++++++++++++++++++
.../hadoop/fs/azurebfs/oauth2/AzureADToken.java | 47 +++
.../oauth2/ClientCredsTokenProvider.java | 62 ++++
.../oauth2/CustomTokenProviderAdaptee.java | 75 ++++
.../oauth2/CustomTokenProviderAdapter.java | 57 +++
.../fs/azurebfs/oauth2/MsiTokenProvider.java | 48 +++
.../hadoop/fs/azurebfs/oauth2/QueryParams.java | 69 ++++
.../oauth2/RefreshTokenBasedTokenProvider.java | 57 +++
.../oauth2/UserPasswordTokenProvider.java | 66 ++++
.../hadoop/fs/azurebfs/oauth2/package-info.java | 18 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 18 +-
.../fs/azurebfs/services/AbfsHttpHeader.java | 2 +-
.../fs/azurebfs/services/AbfsRestOperation.java | 19 +-
.../hadoop/fs/azurebfs/services/AuthType.java | 27 ++
.../azurebfs/AbstractAbfsIntegrationTest.java | 35 +-
.../hadoop/fs/azurebfs/ITestAbfsClient.java | 2 +-
.../ITestAzureBlobFileSystemBackCompat.java | 4 +
.../ITestAzureBlobFileSystemFileStatus.java | 3 -
.../ITestAzureBlobFileSystemFinalize.java | 8 +-
.../azurebfs/ITestAzureBlobFileSystemFlush.java | 8 +-
.../azurebfs/ITestAzureBlobFileSystemOauth.java | 176 ++++++++++
.../ITestAzureBlobFileSystemRandomRead.java | 3 +
.../azurebfs/ITestFileSystemInitialization.java | 5 +-
.../azurebfs/ITestFileSystemRegistration.java | 11 +-
.../fs/azurebfs/ITestWasbAbfsCompatibility.java | 2 +
.../constants/TestConfigurationKeys.java | 6 +
.../contract/ABFSContractTestBinding.java | 14 +-
.../ITestAbfsFileSystemContractAppend.java | 19 +-
.../ITestAbfsFileSystemContractConcat.java | 17 +-
.../ITestAbfsFileSystemContractCreate.java | 17 +-
.../ITestAbfsFileSystemContractDelete.java | 17 +-
.../ITestAbfsFileSystemContractDistCp.java | 2 +-
...TestAbfsFileSystemContractGetFileStatus.java | 17 +-
.../ITestAbfsFileSystemContractMkdir.java | 17 +-
.../ITestAbfsFileSystemContractOpen.java | 17 +-
.../ITestAbfsFileSystemContractRename.java | 17 +-
...TestAbfsFileSystemContractRootDirectory.java | 16 +-
...ITestAbfsFileSystemContractSecureDistCp.java | 2 +-
.../ITestAbfsFileSystemContractSeek.java | 17 +-
.../ITestAbfsFileSystemContractSetTimes.java | 17 +-
.../ITestAzureBlobFileSystemBasics.java | 2 +-
.../fs/azurebfs/services/TestAbfsClient.java | 6 +-
.../fs/azurebfs/services/TestQueryParams.java | 72 ++++
.../utils/CleanUpAbfsTestContainer.java | 13 +-
.../src/test/resources/azure-bfs-test.xml | 128 ++++++-
52 files changed, 1768 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e647ae8..f26f562 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
@@ -26,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
@@ -37,16 +37,26 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdaptee;
+import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
+import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
+import org.apache.hadoop.util.ReflectionUtils;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE;
/**
@@ -58,81 +68,81 @@ public class AbfsConfiguration{
private final Configuration configuration;
private final boolean isSecure;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
private int writeBufferSize;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
private int readBufferSize;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL,
DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
private int minBackoffInterval;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_BACKOFF_INTERVAL,
DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
private int maxBackoffInterval;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
private int backoffInterval;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_RETRIES,
MinValue = 0,
DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
private int maxIoRetries;
- @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME,
+ @LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
MinValue = 0,
MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
private long azureBlockSize;
- @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
private String azureBlockLocationHost;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
MinValue = 1,
DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
private int maxConcurrentWriteThreads;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN,
MinValue = 1,
DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
private int maxConcurrentReadThreads;
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND,
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_TOLERATE_CONCURRENT_APPEND,
DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
private boolean tolerateOobAppends;
- @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY,
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ATOMIC_RENAME_KEY,
DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
private String azureAtomicDirs;
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization;
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION,
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION,
DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION)
private boolean skipUserGroupMetadataDuringInitialization;
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
private int readAheadQueueDepth;
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH,
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
- @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY,
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = "")
private String userAgentId;
@@ -140,7 +150,7 @@ public class AbfsConfiguration{
public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
this.configuration = configuration;
- this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false);
+ this.isSecure = this.configuration.getBoolean(FS_AZURE_SECURE_MODE, false);
validateStorageAccountKeys();
Field[] fields = this.getClass().getDeclaredFields();
@@ -161,17 +171,17 @@ public class AbfsConfiguration{
}
public boolean isEmulator() {
- return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
+ return this.getConfiguration().getBoolean(FS_AZURE_EMULATOR_ENABLED, false);
}
public boolean isSecureMode() {
- return this.isSecure;
+ return isSecure;
}
public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException {
String key;
String keyProviderClass =
- configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName);
+ configuration.get(AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName);
KeyProvider keyProvider;
if (keyProviderClass == null) {
@@ -278,19 +288,88 @@ public class AbfsConfiguration{
return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE);
}
+ public AuthType getAuthType(final String accountName) {
+ return configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
+ }
+
+ public AccessTokenProvider getTokenProvider(final String accountName) throws TokenAccessProviderException {
+ AuthType authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
+ if (authType == AuthType.OAuth) {
+ try {
+ Class<? extends AccessTokenProvider> tokenProviderClass =
+ configuration.getClass(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName, null,
+ AccessTokenProvider.class);
+ AccessTokenProvider tokenProvider = null;
+ if (tokenProviderClass == ClientCredsTokenProvider.class) {
+ String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName);
+ String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
+ String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountName);
+ tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret);
+ } else if (tokenProviderClass == UserPasswordTokenProvider.class) {
+ String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName);
+ String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountName);
+ String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountName);
+ tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password);
+ } else if (tokenProviderClass == MsiTokenProvider.class) {
+ String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountName);
+ String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
+ tokenProvider = new MsiTokenProvider(tenantGuid, clientId);
+ } else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) {
+ String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountName);
+ String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
+ tokenProvider = new RefreshTokenBasedTokenProvider(clientId, refreshToken);
+ } else {
+ throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
+ }
+ return tokenProvider;
+ } catch(IllegalArgumentException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new TokenAccessProviderException("Unable to load key provider class.", e);
+ }
+
+ } else if (authType == AuthType.Custom) {
+ try {
+ String configKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName;
+ Class<? extends CustomTokenProviderAdaptee> customTokenProviderClass =
+ configuration.getClass(configKey, null,
+ CustomTokenProviderAdaptee.class);
+ if (customTokenProviderClass == null) {
+ throw new IllegalArgumentException(
+ String.format("The configuration value for \"%s\" is invalid.", configKey));
+ }
+ CustomTokenProviderAdaptee azureTokenProvider = ReflectionUtils
+ .newInstance(customTokenProviderClass, configuration);
+ if (azureTokenProvider == null) {
+ throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass);
+ }
+ azureTokenProvider.initialize(configuration, accountName);
+ return new CustomTokenProviderAdapter(azureTokenProvider);
+ } catch(IllegalArgumentException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new TokenAccessProviderException("Unable to load custom token provider class.", e);
+ }
+
+ } else {
+ throw new TokenAccessProviderException(String.format(
+ "Invalid auth type: %s is being used, expecting OAuth", authType));
+ }
+ }
+
void validateStorageAccountKeys() throws InvalidConfigurationValueException {
Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
- ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
- this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
+ FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
+ this.storageAccountKeys = configuration.getValByRegex(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
- for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) {
+ for (Map.Entry<String, String> account : storageAccountKeys.entrySet()) {
validator.validate(account.getValue());
}
}
int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
+ String value = configuration.get(validator.ConfigurationKey());
// validate
return new IntegerConfigurationBasicValidator(
@@ -303,7 +382,7 @@ public class AbfsConfiguration{
long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
+ String value = configuration.get(validator.ConfigurationKey());
// validate
return new LongConfigurationBasicValidator(
@@ -316,7 +395,7 @@ public class AbfsConfiguration{
String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
+ String value = configuration.get(validator.ConfigurationKey());
// validate
return new StringConfigurationBasicValidator(
@@ -327,7 +406,7 @@ public class AbfsConfiguration{
String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
- String value = this.configuration.get(validator.ConfigurationKey());
+ String value = configuration.get(validator.ConfigurationKey());
// validate
return new Base64StringConfigurationBasicValidator(
@@ -338,7 +417,7 @@ public class AbfsConfiguration{
boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
+ String value = configuration.get(validator.ConfigurationKey());
// validate
return new BooleanConfigurationBasicValidator(
@@ -347,6 +426,14 @@ public class AbfsConfiguration{
validator.ThrowIfInvalid()).validate(value);
}
+ String getPasswordString(String key) throws IOException {
+ char[] passchars = configuration.getPassword(key);
+ if (passchars != null) {
+ return new String(passchars);
+ }
+ return null;
+ }
+
@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index ba72149..b8da35b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -67,10 +67,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.permission.FsAction;
@@ -487,16 +489,22 @@ public class AzureBlobFileSystemStore {
throw new InvalidUriException(uri.toString());
}
- int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
- if (dotIndex <= 0) {
- throw new InvalidUriException(
- uri.toString() + " - account name is not fully qualified.");
+ SharedKeyCredentials creds = null;
+ AccessTokenProvider tokenProvider = null;
+
+ if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
+ int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
+ if (dotIndex <= 0) {
+ throw new InvalidUriException(
+ uri.toString() + " - account name is not fully qualified.");
+ }
+ creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
+ abfsConfiguration.getStorageAccountKey(accountName));
+ } else {
+ tokenProvider = abfsConfiguration.getTokenProvider(accountName);
}
- SharedKeyCredentials creds =
- new SharedKeyCredentials(accountName.substring(0, dotIndex),
- this.abfsConfiguration.getStorageAccountKey(accountName));
- this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy());
+ this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
}
private String getRelativePath(final Path path) {
@@ -537,7 +545,7 @@ public class AzureBlobFileSystemStore {
Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime);
parsedTime = utcDate.getTime();
} catch (ParseException e) {
- LOG.error("Failed to parse the date {0}", lastModifiedTime);
+ LOG.error("Failed to parse the date {}", lastModifiedTime);
} finally {
return parsedTime;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 16ddd90..ffdf700 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -60,5 +60,24 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+ /** Prefix for auth type properties: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = "fs.azure.account.auth.type.";
+ /** Prefix for oauth token provider type: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME = "fs.azure.account.oauth.provider.type.";
+ /** Prefix for oauth AAD client id: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID = "fs.azure.account.oauth2.client.id.";
+ /** Prefix for oauth AAD client secret: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret.";
+ /** Prefix for oauth AAD client endpoint: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT = "fs.azure.account.oauth2.client.endpoint.";
+ /** Prefix for oauth msi tenant id: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT = "fs.azure.account.oauth2.msi.tenant.";
+ /** Prefix for oauth user name: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_USER_NAME = "fs.azure.account.oauth2.user.name.";
+ /** Prefix for oauth user password: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD = "fs.azure.account.oauth2.user.password.";
+ /** Prefix for oauth refresh token: {@value}. */
+ public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token.";
+
private ConfigurationKeys() {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java
new file mode 100644
index 0000000..b40b34a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown if there is a problem instantiating a TokenAccessProvider or retrieving a configuration
+ * using a TokenAccessProvider object.
+ */
+@InterfaceAudience.Private
+public class TokenAccessProviderException extends AzureBlobFileSystemException {
+
+ public TokenAccessProviderException(String message) {
+ super(message);
+ }
+
+ public TokenAccessProviderException(String message, Throwable cause) {
+ super(message);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
index a89f339..63bf8d0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
@@ -44,6 +44,7 @@ public enum AzureServiceErrorCode {
INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."),
EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."),
INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
+ AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
UNKNOWN(null, -1, null);
private final String errorCode;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java
index 903ff69..1de9dfa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java
@@ -58,12 +58,30 @@ public class ListResultEntrySchema {
private Long contentLength;
/**
+ * The owner property.
+ */
+ @JsonProperty(value = "owner")
+ private String owner;
+
+ /**
+ * The group property.
+ */
+ @JsonProperty(value = "group")
+ private String group;
+
+ /**
+ * The permissions property.
+ */
+ @JsonProperty(value = "permissions")
+ private String permissions;
+
+ /**
* Get the name value.
*
* @return the name value
*/
public String name() {
- return this.name;
+ return name;
}
/**
@@ -83,7 +101,7 @@ public class ListResultEntrySchema {
* @return the isDirectory value
*/
public Boolean isDirectory() {
- return this.isDirectory;
+ return isDirectory;
}
/**
@@ -103,7 +121,7 @@ public class ListResultEntrySchema {
* @return the lastModified value
*/
public String lastModified() {
- return this.lastModified;
+ return lastModified;
}
/**
@@ -123,7 +141,7 @@ public class ListResultEntrySchema {
* @return the etag value
*/
public String eTag() {
- return this.eTag;
+ return eTag;
}
/**
@@ -143,7 +161,7 @@ public class ListResultEntrySchema {
* @return the contentLength value
*/
public Long contentLength() {
- return this.contentLength;
+ return contentLength;
}
/**
@@ -157,4 +175,65 @@ public class ListResultEntrySchema {
return this;
}
+ /**
+ *
+ Get the owner value.
+ *
+ * @return the owner value
+ */
+ public String owner() {
+ return owner;
+ }
+
+ /**
+ * Set the owner value.
+ *
+ * @param owner the owner value to set
+ * @return the ListEntrySchema object itself.
+ */
+ public ListResultEntrySchema withOwner(final String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ /**
+ * Get the group value.
+ *
+ * @return the group value
+ */
+ public String group() {
+ return group;
+ }
+
+ /**
+ * Set the group value.
+ *
+ * @param group the group value to set
+ * @return the ListEntrySchema object itself.
+ */
+ public ListResultEntrySchema withGroup(final String group) {
+ this.group = group;
+ return this;
+ }
+
+ /**
+ * Get the permissions value.
+ *
+ * @return the permissions value
+ */
+ public String permissions() {
+ return permissions;
+ }
+
+ /**
+ * Set the permissions value.
+ *
+ * @param permissions the permissions value to set
+ * @return the ListEntrySchema object itself.
+ */
+ public ListResultEntrySchema withPermissions(final String permissions) {
+ this.permissions = permissions;
+ return this;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
new file mode 100644
index 0000000..72f37a1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Returns an Azure Active Directory token when requested. The provider can
+ * cache the token if it has already retrieved one. If it does, then the
+ * provider is responsible for checking expiry and refreshing as needed.
+ *
+ * In other words, this is is a token cache that fetches tokens when
+ * requested, if the cached token has expired.
+ *
+ */
+public abstract class AccessTokenProvider {
+
+ private AzureADToken token;
+ private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+ /**
+ * returns the {@link AzureADToken} cached (or retrieved) by this instance.
+ *
+ * @return {@link AzureADToken} containing the access token
+ * @throws IOException if there is an error fetching the token
+ */
+ public synchronized AzureADToken getToken() throws IOException {
+ if (isTokenAboutToExpire()) {
+ LOG.debug("AAD Token is missing or expired:"
+ + " Calling refresh-token from abstract base class");
+ token = refreshToken();
+ }
+ return token;
+ }
+
+ /**
+ * the method to fetch the access token. Derived classes should override
+ * this method to actually get the token from Azure Active Directory.
+ *
+ * This method will be called initially, and then once when the token
+ * is about to expire.
+ *
+ *
+ * @return {@link AzureADToken} containing the access token
+ * @throws IOException if there is an error fetching the token
+ */
+ protected abstract AzureADToken refreshToken() throws IOException;
+
+ /**
+ * Checks if the token is about to expire in the next 5 minutes.
+ * The 5 minute allowance is to allow for clock skew and also to
+ * allow for token to be refreshed in that much time.
+ *
+ * @return true if the token is expiring in next 5 minutes
+ */
+ private boolean isTokenAboutToExpire() {
+ if (token == null) {
+ LOG.debug("AADToken: no token. Returning expiring=true");
+ return true; // no token should have same response as expired token
+ }
+ boolean expiring = false;
+ // allow 5 minutes for clock skew
+ long approximatelyNow = System.currentTimeMillis() + FIVE_MINUTES;
+ if (token.getExpiry().getTime() < approximatelyNow) {
+ expiring = true;
+ }
+ if (expiring) {
+ LOG.debug("AADToken: token expiring: "
+ + token.getExpiry().toString()
+ + " : Five-minute window: "
+ + new Date(approximatelyNow).toString());
+ }
+
+ return expiring;
+ }
+
+ // 5 minutes in milliseconds
+ private static final long FIVE_MINUTES = 300 * 1000;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
new file mode 100644
index 0000000..e82dc95
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
+
+/**
+ * This class provides convenience methods to obtain AAD tokens.
+ * While convenient, it is not necessary to use these methods to
+ * obtain the tokens. Customers can use any other method
+ * (e.g., using the adal4j client) to obtain tokens.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class AzureADAuthenticator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class);
+ private static final String RESOURCE_NAME = "https://storage.azure.com/";
+ private static final int CONNECT_TIMEOUT = 30 * 1000;
+ private static final int READ_TIMEOUT = 30 * 1000;
+
+ private AzureADAuthenticator() {
+ // no operation
+ }
+
+ /**
+ * gets Azure Active Directory token using the user ID and password of
+ * a service principal (that is, Web App in Azure Active Directory).
+ *
+ * Azure Active Directory allows users to set up a web app as a
+ * service principal. Users can optionally obtain service principal keys
+ * from AAD. This method gets a token using a service principal's client ID
+ * and keys. In addition, it needs the token endpoint associated with the
+ * user's directory.
+ *
+ *
+ * @param authEndpoint the OAuth 2.0 token endpoint associated
+ * with the user's directory (obtain from
+ * Active Directory configuration)
+ * @param clientId the client ID (GUID) of the client web app
+ * btained from Azure Active Directory configuration
+ * @param clientSecret the secret key of the client web app
+ * @return {@link AzureADToken} obtained using the creds
+ * @throws IOException throws IOException if there is a failure in connecting to Azure AD
+ */
+ public static AzureADToken getTokenUsingClientCreds(String authEndpoint,
+ String clientId, String clientSecret)
+ throws IOException {
+ Preconditions.checkNotNull(authEndpoint, "authEndpoint");
+ Preconditions.checkNotNull(clientId, "clientId");
+ Preconditions.checkNotNull(clientSecret, "clientSecret");
+
+ QueryParams qp = new QueryParams();
+ qp.add("resource", RESOURCE_NAME);
+ qp.add("grant_type", "client_credentials");
+ qp.add("client_id", clientId);
+ qp.add("client_secret", clientSecret);
+ LOG.debug("AADToken: starting to fetch token using client creds for client ID " + clientId);
+
+ return getTokenCall(authEndpoint, qp.serialize(), null, null);
+ }
+
+ /**
+ * Gets AAD token from the local virtual machine's VM extension. This only works on
+ * an Azure VM with MSI extension
+ * enabled.
+ *
+ * @param tenantGuid (optional) The guid of the AAD tenant. Can be {@code null}.
+ * @param clientId (optional) The clientId guid of the MSI service
+ * principal to use. Can be {@code null}.
+ * @param bypassCache {@code boolean} specifying whether a cached token is acceptable or a fresh token
+ * request should me made to AAD
+ * @return {@link AzureADToken} obtained using the creds
+ * @throws IOException throws IOException if there is a failure in obtaining the token
+ */
+ public static AzureADToken getTokenFromMsi(String tenantGuid, String clientId,
+ boolean bypassCache) throws IOException {
+ Preconditions.checkNotNull(tenantGuid, "tenantGuid");
+ Preconditions.checkNotNull(clientId, "clientId");
+
+ String authEndpoint = "http://169.254.169.254/metadata/identity/oauth2/token";
+
+ QueryParams qp = new QueryParams();
+ qp.add("api-version", "2018-02-01");
+ qp.add("resource", RESOURCE_NAME);
+
+
+ if (tenantGuid.length() > 0) {
+ String authority = "https://login.microsoftonline.com/" + tenantGuid;
+ qp.add("authority", authority);
+ }
+
+ if (clientId.length() > 0) {
+ qp.add("client_id", clientId);
+ }
+
+ if (bypassCache) {
+ qp.add("bypass_cache", "true");
+ }
+
+ Hashtable<String, String> headers = new Hashtable<>();
+ headers.put("Metadata", "true");
+
+ LOG.debug("AADToken: starting to fetch token using MSI");
+ return getTokenCall(authEndpoint, qp.serialize(), headers, "GET");
+ }
+
+ /**
+ * Gets Azure Active Directory token using refresh token.
+ *
+ * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration
+ * @param refreshToken the refresh token
+ * @return {@link AzureADToken} obtained using the refresh token
+ * @throws IOException throws IOException if there is a failure in connecting to Azure AD
+ */
+ public static AzureADToken getTokenUsingRefreshToken(String clientId,
+ String refreshToken) throws IOException {
+ String authEndpoint = "https://login.microsoftonline.com/Common/oauth2/token";
+ QueryParams qp = new QueryParams();
+ qp.add("grant_type", "refresh_token");
+ qp.add("refresh_token", refreshToken);
+ if (clientId != null) {
+ qp.add("client_id", clientId);
+ }
+ LOG.debug("AADToken: starting to fetch token using refresh token for client ID " + clientId);
+ return getTokenCall(authEndpoint, qp.serialize(), null, null);
+ }
+
+ private static class HttpException extends IOException {
+ private int httpErrorCode;
+ private String requestId;
+
+ public int getHttpErrorCode() {
+ return this.httpErrorCode;
+ }
+
+ public String getRequestId() {
+ return this.requestId;
+ }
+
+ HttpException(int httpErrorCode, String requestId, String message) {
+ super(message);
+ this.httpErrorCode = httpErrorCode;
+ this.requestId = requestId;
+ }
+ }
+
+ private static AzureADToken getTokenCall(String authEndpoint, String body,
+ Hashtable<String, String> headers, String httpMethod)
+ throws IOException {
+ AzureADToken token = null;
+ ExponentialRetryPolicy retryPolicy
+ = new ExponentialRetryPolicy(3, 0, 1000, 2);
+
+ int httperror = 0;
+ String requestId;
+ String httpExceptionMessage = null;
+ IOException ex = null;
+ boolean succeeded = false;
+ int retryCount = 0;
+ do {
+ httperror = 0;
+ requestId = "";
+ ex = null;
+ try {
+ token = getTokenSingleCall(authEndpoint, body, headers, httpMethod);
+ } catch (HttpException e) {
+ httperror = e.httpErrorCode;
+ requestId = e.requestId;
+ httpExceptionMessage = e.getMessage();
+ } catch (IOException e) {
+ ex = e;
+ }
+ succeeded = ((httperror == 0) && (ex == null));
+ retryCount++;
+ } while (!succeeded && retryPolicy.shouldRetry(retryCount, httperror));
+ if (!succeeded) {
+ if (ex != null) {
+ throw ex;
+ }
+ if (httperror != 0) {
+ throw new IOException(httpExceptionMessage);
+ }
+ }
+ return token;
+ }
+
+ private static AzureADToken getTokenSingleCall(
+ String authEndpoint, String payload, Hashtable<String, String> headers, String httpMethod)
+ throws IOException {
+
+ AzureADToken token = null;
+ HttpURLConnection conn = null;
+ String urlString = authEndpoint;
+
+ httpMethod = (httpMethod == null) ? "POST" : httpMethod;
+ if (httpMethod.equals("GET")) {
+ urlString = urlString + "?" + payload;
+ }
+
+ try {
+ URL url = new URL(urlString);
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod(httpMethod);
+ conn.setReadTimeout(READ_TIMEOUT);
+ conn.setConnectTimeout(CONNECT_TIMEOUT);
+
+ if (headers != null && headers.size() > 0) {
+ for (Map.Entry<String, String> entry : headers.entrySet()) {
+ conn.setRequestProperty(entry.getKey(), entry.getValue());
+ }
+ }
+ conn.setRequestProperty("Connection", "close");
+
+ if (httpMethod.equals("POST")) {
+ conn.setDoOutput(true);
+ conn.getOutputStream().write(payload.getBytes("UTF-8"));
+ }
+
+ int httpResponseCode = conn.getResponseCode();
+ String requestId = conn.getHeaderField("x-ms-request-id");
+ String responseContentType = conn.getHeaderField("Content-Type");
+ long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0);
+
+ requestId = requestId == null ? "" : requestId;
+ if (httpResponseCode == HttpURLConnection.HTTP_OK
+ && responseContentType.startsWith("application/json") && responseContentLength > 0) {
+ InputStream httpResponseStream = conn.getInputStream();
+ token = parseTokenFromStream(httpResponseStream);
+ } else {
+ String responseBody = consumeInputStream(conn.getInputStream(), 1024);
+ String proxies = "none";
+ String httpProxy = System.getProperty("http.proxy");
+ String httpsProxy = System.getProperty("https.proxy");
+ if (httpProxy != null || httpsProxy != null) {
+ proxies = "http:" + httpProxy + "; https:" + httpsProxy;
+ }
+ String logMessage =
+ "AADToken: HTTP connection failed for getting token from AzureAD. Http response: "
+ + httpResponseCode + " " + conn.getResponseMessage()
+ + " Content-Type: " + responseContentType
+ + " Content-Length: " + responseContentLength
+ + " Request ID: " + requestId.toString()
+ + " Proxies: " + proxies
+ + " First 1K of Body: " + responseBody;
+ LOG.debug(logMessage);
+ throw new HttpException(httpResponseCode, requestId, logMessage);
+ }
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ return token;
+ }
+
+ private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException {
+ AzureADToken token = new AzureADToken();
+ try {
+ int expiryPeriod = 0;
+
+ JsonFactory jf = new JsonFactory();
+ JsonParser jp = jf.createJsonParser(httpResponseStream);
+ String fieldName, fieldValue;
+ jp.nextToken();
+ while (jp.hasCurrentToken()) {
+ if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+ fieldName = jp.getCurrentName();
+ jp.nextToken(); // field value
+ fieldValue = jp.getText();
+
+ if (fieldName.equals("access_token")) {
+ token.setAccessToken(fieldValue);
+ }
+ if (fieldName.equals("expires_in")) {
+ expiryPeriod = Integer.parseInt(fieldValue);
+ }
+ }
+ jp.nextToken();
+ }
+ jp.close();
+ long expiry = System.currentTimeMillis();
+ expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add
+ token.setExpiry(new Date(expiry));
+ LOG.debug("AADToken: fetched token with expiry " + token.getExpiry().toString());
+ } catch (Exception ex) {
+ LOG.debug("AADToken: got exception when parsing json token " + ex.toString());
+ throw ex;
+ } finally {
+ httpResponseStream.close();
+ }
+ return token;
+ }
+
+ private static String consumeInputStream(InputStream inStream, int length) throws IOException {
+ byte[] b = new byte[length];
+ int totalBytesRead = 0;
+ int bytesRead = 0;
+
+ do {
+ bytesRead = inStream.read(b, totalBytesRead, length - totalBytesRead);
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+ }
+ } while (bytesRead >= 0 && totalBytesRead < length);
+
+ return new String(b, 0, totalBytesRead, StandardCharsets.UTF_8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java
new file mode 100644
index 0000000..daa5a93
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.util.Date;
+
+
+/**
+ * Object representing the AAD access token to use when making HTTP requests to Azure Data Lake Storage.
+ */
+public class AzureADToken {
+ private String accessToken;
+ private Date expiry;
+
+ public String getAccessToken() {
+ return this.accessToken;
+ }
+
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
+ public Date getExpiry() {
+ return new Date(this.expiry.getTime());
+ }
+
+ public void setExpiry(Date expiry) {
+ this.expiry = new Date(expiry.getTime());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java
new file mode 100644
index 0000000..9a46018
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides tokens based on client credentials.
+ */
+public class ClientCredsTokenProvider extends AccessTokenProvider {
+
+ private final String authEndpoint;
+
+ private final String clientId;
+
+ private final String clientSecret;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+
+ public ClientCredsTokenProvider(final String authEndpoint,
+ final String clientId, final String clientSecret) {
+
+ Preconditions.checkNotNull(authEndpoint, "authEndpoint");
+ Preconditions.checkNotNull(clientId, "clientId");
+ Preconditions.checkNotNull(clientSecret, "clientSecret");
+
+ this.authEndpoint = authEndpoint;
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ }
+
+
+ @Override
+ protected AzureADToken refreshToken() throws IOException {
+ LOG.debug("AADToken: refreshing client-credential based token");
+ return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, clientId, clientSecret);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java
new file mode 100644
index 0000000..7366a8d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * This interface provides an extensibility model for customizing the acquisition
+ * of Azure Active Directory Access Tokens. When "fs.azure.account.auth.type" is
+ * set to "Custom", implementors may use the
+ * "fs.azure.account.oauth.provider.type.{accountName}" configuration property
+ * to specify a class with a custom implementation of CustomTokenProviderAdaptee.
+ * This class will be dynamically loaded, initialized, and invoked to provide
+ * AAD Access Tokens and their Expiry.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CustomTokenProviderAdaptee {
+
+ /**
+ * Initialize with supported configuration. This method is invoked when the
+ * (URI, Configuration)} method is invoked.
+ *
+ * @param configuration Configuration object
+ * @param accountName Account Name
+ * @throws IOException if instance can not be configured.
+ */
+ void initialize(Configuration configuration, String accountName)
+ throws IOException;
+
+ /**
+ * Obtain the access token that should be added to https connection's header.
+ * Will be called depending upon {@link #getExpiryTime()} expiry time is set,
+ * so implementations should be performant. Implementations are responsible
+ * for any refreshing of the token.
+ *
+ * @return String containing the access token
+ * @throws IOException if there is an error fetching the token
+ */
+ String getAccessToken() throws IOException;
+
+ /**
+ * Obtain expiry time of the token. If implementation is performant enough to
+ * maintain expiry and expect {@link #getAccessToken()} call for every
+ * connection then safe to return current or past time.
+ *
+ * However recommended to use the token expiry time received from Azure Active
+ * Directory.
+ *
+ * @return Date to expire access token retrieved from AAD.
+ */
+ Date getExpiryTime();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java
new file mode 100644
index 0000000..7bae415
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides tokens based on custom implementation, following the Adapter Design
+ * Pattern.
+ */
+public final class CustomTokenProviderAdapter extends AccessTokenProvider {
+
+ private CustomTokenProviderAdaptee adaptee;
+ private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+ /**
+ * Constructs a token provider based on the custom token provider.
+ *
+ * @param adaptee the custom token provider
+ */
+ public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) {
+ Preconditions.checkNotNull(adaptee, "adaptee");
+ this.adaptee = adaptee;
+ }
+
+ protected AzureADToken refreshToken() throws IOException {
+ LOG.debug("AADToken: refreshing custom based token");
+
+ AzureADToken azureADToken = new AzureADToken();
+ azureADToken.setAccessToken(adaptee.getAccessToken());
+ azureADToken.setExpiry(adaptee.getExpiryTime());
+
+ return azureADToken;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
new file mode 100644
index 0000000..2deb9d2
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides tokens based on Azure VM's Managed Service Identity.
+ */
+public class MsiTokenProvider extends AccessTokenProvider {
+
+ private final String tenantGuid;
+
+ private final String clientId;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+ public MsiTokenProvider(final String tenantGuid, final String clientId) {
+ this.tenantGuid = tenantGuid;
+ this.clientId = clientId;
+ }
+
+ @Override
+ protected AzureADToken refreshToken() throws IOException {
+ LOG.debug("AADToken: refreshing token from MSI");
+ AzureADToken token = AzureADAuthenticator.getTokenFromMsi(tenantGuid, clientId, false);
+ return token;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java
new file mode 100644
index 0000000..ff6e06f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utilities class http query parameters.
+ */
+public class QueryParams {
+ private Map<String, String> params = new HashMap<>();
+ private String apiVersion = null;
+ private String separator = "";
+ private String serializedString = null;
+
+ public void add(String name, String value) {
+ params.put(name, value);
+ serializedString = null;
+ }
+
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ serializedString = null;
+ }
+
+ public String serialize() {
+ if (serializedString == null) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ String name = entry.getKey();
+ try {
+ sb.append(separator);
+ sb.append(URLEncoder.encode(name, "UTF-8"));
+ sb.append('=');
+ sb.append(URLEncoder.encode(entry.getValue(), "UTF-8"));
+ separator = "&";
+ } catch (UnsupportedEncodingException ex) {
+ }
+ }
+
+ if (apiVersion != null) {
+ sb.append(separator);
+ sb.append("api-version=");
+ sb.append(apiVersion);
+ separator = "&";
+ }
+ serializedString = sb.toString();
+ }
+ return serializedString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java
new file mode 100644
index 0000000..949d5bf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides tokens based on refresh token.
+ */
+public class RefreshTokenBasedTokenProvider extends AccessTokenProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+ private final String clientId;
+
+ private final String refreshToken;
+
+ /**
+ * Constructs a token provider based on the refresh token provided.
+ *
+ * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration
+ * @param refreshToken the refresh token
+ */
+ public RefreshTokenBasedTokenProvider(String clientId, String refreshToken) {
+ Preconditions.checkNotNull(clientId, "clientId");
+ Preconditions.checkNotNull(refreshToken, "refreshToken");
+ this.clientId = clientId;
+ this.refreshToken = refreshToken;
+ }
+
+
+ @Override
+ protected AzureADToken refreshToken() throws IOException {
+ LOG.debug("AADToken: refreshing refresh-token based token");
+ return AzureADAuthenticator.getTokenUsingRefreshToken(clientId, refreshToken);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java
new file mode 100644
index 0000000..7504e9d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+/**
+ * Provides tokens based on username and password.
+ */
+public class UserPasswordTokenProvider extends AccessTokenProvider {
+
+ private final String authEndpoint;
+
+ private final String username;
+
+ private final String password;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
+
+ public UserPasswordTokenProvider(final String authEndpoint,
+ final String username, final String password) {
+ Preconditions.checkNotNull(authEndpoint, "authEndpoint");
+ Preconditions.checkNotNull(username, "username");
+ Preconditions.checkNotNull(password, "password");
+
+ this.authEndpoint = authEndpoint;
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ protected AzureADToken refreshToken() throws IOException {
+ LOG.debug("AADToken: refreshing user-password based token");
+ return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, username, password);
+ }
+
+ private static String getPasswordString(Configuration conf, String key)
+ throws IOException {
+ char[] passchars = conf.getPassword(key);
+ if (passchars == null) {
+ throw new IOException("Password " + key + " not found");
+ }
+ return new String(passchars);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
new file mode 100644
index 0000000..bad1a85
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index e003ffd..f5c9f18 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
@@ -42,7 +43,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
/**
- * AbfsClient
+ * AbfsClient.
*/
public class AbfsClient {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
@@ -54,9 +55,13 @@ public class AbfsClient {
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
+ private final AccessTokenProvider tokenProvider;
+
+
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
- final ExponentialRetryPolicy exponentialRetryPolicy) {
+ final ExponentialRetryPolicy exponentialRetryPolicy,
+ final AccessTokenProvider tokenProvider) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
@@ -76,6 +81,7 @@ public class AbfsClient {
}
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
+ this.tokenProvider = tokenProvider;
}
public String getFileSystem() {
@@ -409,6 +415,14 @@ public class AbfsClient {
return encodedString;
}
+ public synchronized String getAccessToken() throws IOException {
+ if (tokenProvider != null) {
+ return "Bearer " + tokenProvider.getToken().getAccessToken();
+ } else {
+ return null;
+ }
+ }
+
@VisibleForTesting
String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
final String sslProviderName) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
index 46b4c6d..0067b75 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
/**
- * The Http Request / Response Headers for Rest AbfsClient
+ * The Http Request / Response Headers for Rest AbfsClient.
*/
public class AbfsHttpHeader {
private final String name;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 6dd32fa..c0407f5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
/**
* The AbfsRestOperation for Rest AbfsClient.
@@ -48,7 +49,7 @@ public class AbfsRestOperation {
// request body and all the download methods have a response body.
private final boolean hasRequestBody;
- private final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
// For uploads, this is the request entity body. For downloads,
// this will hold the response entity body.
@@ -139,9 +140,15 @@ public class AbfsRestOperation {
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
// sign the HTTP request
- client.getSharedKeyCredentials().signRequest(
- httpOperation.getConnection(),
- hasRequestBody ? bufferLength : 0);
+ if (client.getAccessToken() == null) {
+ // sign the HTTP request
+ client.getSharedKeyCredentials().signRequest(
+ httpOperation.getConnection(),
+ hasRequestBody ? bufferLength : 0);
+ } else {
+ httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
+ client.getAccessToken());
+ }
if (hasRequestBody) {
// HttpUrlConnection requires
@@ -163,9 +170,7 @@ public class AbfsRestOperation {
return false;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("HttpRequest: " + httpOperation.toString());
- }
+ LOG.debug("HttpRequest: " + httpOperation.toString());
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
new file mode 100644
index 0000000..c95b92c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Auth Type Enum.
+ */
+public enum AuthType {
+ SharedKey,
+ OAuth,
+ Custom
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org