You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/11/30 14:40:24 UTC
[hadoop] branch branch-3.3.5 updated: HADOOP-18457. ABFS: Support account level throttling (#5034)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
new 98aac2a15ca HADOOP-18457. ABFS: Support account level throttling (#5034)
98aac2a15ca is described below
commit 98aac2a15cad462f5a24befa9a6141d22fa13f66
Author: Anmol Asrani <an...@gmail.com>
AuthorDate: Wed Nov 30 18:35:31 2022 +0530
HADOOP-18457. ABFS: Support account level throttling (#5034)
This allows abfs request throttling to be shared across all
abfs connections talking to containers belonging to the same abfs storage
account -as that is the level at which IO throttling is applied.
The option is enabled/disabled in the configuration option
"fs.azure.account.throttling.enabled";
The default is "true"
Contributed by Anmol Asrani
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 24 +++
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 2 -
.../fs/azurebfs/constants/ConfigurationKeys.java | 3 +
.../constants/FileSystemConfigurations.java | 3 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 14 ++
.../services/AbfsClientThrottlingAnalyzer.java | 132 ++++++++------
.../services/AbfsClientThrottlingIntercept.java | 107 +++++++++---
.../services/AbfsNoOpThrottlingIntercept.java | 37 ++++
.../fs/azurebfs/services/AbfsOperationMetrics.java | 139 +++++++++++++++
.../fs/azurebfs/services/AbfsRestOperation.java | 11 +-
.../azurebfs/services/AbfsThrottlingIntercept.java | 49 ++++++
.../services/AbfsThrottlingInterceptFactory.java | 102 +++++++++++
.../fs/azurebfs/services/TimerFunctionality.java | 26 +++
.../hadoop-azure/src/site/markdown/abfs.md | 9 +
.../azurebfs/constants/TestConfigurationKeys.java | 3 +
.../fs/azurebfs/services/TestAbfsClient.java | 5 +
.../services/TestAbfsClientThrottlingAnalyzer.java | 30 ++--
.../services/TestExponentialRetryPolicy.java | 192 +++++++++++++++++++++
18 files changed, 796 insertions(+), 92 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 4c77d2e136d..5a31b10b514 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -117,6 +117,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
+ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
+ private boolean accountThrottlingEnabled;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
@@ -260,6 +264,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
+ DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
+ private int accountOperationIdleTimeout;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
+ DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
+ private int analysisPeriod;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
MinValue = 0,
DefaultValue = RATE_LIMIT_DEFAULT)
@@ -694,6 +706,10 @@ public class AbfsConfiguration{
return this.azureAppendBlobDirs;
}
+ public boolean accountThrottlingEnabled() {
+ return accountThrottlingEnabled;
+ }
+
public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
@@ -736,6 +752,14 @@ public class AbfsConfiguration{
return this.enableAutoThrottling;
}
+ public int getAccountOperationIdleTimeout() {
+ return accountOperationIdleTimeout;
+ }
+
+ public int getAnalysisPeriod() {
+ return analysisPeriod;
+ }
+
public int getRateLimit() {
return rateLimit;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index d0bdd9818db..21501d28f42 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -55,7 +55,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
@@ -225,7 +224,6 @@ public class AzureBlobFileSystem extends FileSystem
}
}
- AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 0353f3e01ff..a59f76b6d0f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -38,6 +38,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
+ public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
@@ -116,6 +117,8 @@ public final class ConfigurationKeys {
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
+ public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
+ public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 097285bb48f..0ea2c929800 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -94,6 +94,9 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
+ public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
+ public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
+ public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= DelegatingSSLSocketFactory.SSLChannelMode.Default;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index fe2ea35f1df..9974979aeba 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -101,6 +101,7 @@ public class AbfsClient implements Closeable {
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
+ private final AbfsThrottlingIntercept intercept;
private final ListeningScheduledExecutorService executorService;
@@ -120,6 +121,7 @@ public class AbfsClient implements Closeable {
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
+ this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
@@ -216,6 +218,10 @@ public class AbfsClient implements Closeable {
return sharedKeyCredentials;
}
+ AbfsThrottlingIntercept getIntercept() {
+ return intercept;
+ }
+
List<AbfsHttpHeader> createDefaultHeaders() {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
@@ -1277,6 +1283,14 @@ public class AbfsClient implements Closeable {
return abfsCounters;
}
+ /**
+ * Getter for abfsConfiguration from AbfsClient.
+ * @return AbfsConfiguration instance
+ */
+ protected AbfsConfiguration getAbfsConfiguration() {
+ return abfsConfiguration;
+ }
+
public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index a55c924dd81..f1eb3a2a774 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -20,20 +20,23 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.util.Time.now;
+
class AbfsClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class);
- private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
@@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer {
private String name = null;
private Timer timer = null;
private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
+ private AtomicLong lastExecutionTime = null;
+ private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
+ private AbfsConfiguration abfsConfiguration = null;
+ private boolean accountLevelThrottlingEnabled = true;
private AbfsClientThrottlingAnalyzer() {
// hide default constructor
}
- /**
- * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
- * the specified name.
- *
- * @param name a name used to identify this instance.
- * @throws IllegalArgumentException if name is null or empty.
- */
- AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
- this(name, DEFAULT_ANALYSIS_PERIOD_MS);
- }
-
/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name and period.
*
* @param name A name used to identify this instance.
- * @param period The frequency, in milliseconds, at which metrics are
- * analyzed.
+ * @param abfsConfiguration The configuration set.
* @throws IllegalArgumentException If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
- AbfsClientThrottlingAnalyzer(String name, int period)
+ AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty.");
+ int period = abfsConfiguration.getAnalysisPeriod();
Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000.");
this.name = name;
- this.analysisPeriodMs = period;
+ this.abfsConfiguration = abfsConfiguration;
+ this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
+ this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
+ this.lastExecutionTime = new AtomicLong(now());
this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
@@ -95,6 +94,47 @@ class AbfsClientThrottlingAnalyzer {
analysisPeriodMs);
}
+ /**
+ * Resumes the timer if it was stopped.
+ */
+ private void resumeTimer() {
+ blobMetrics = new AtomicReference<AbfsOperationMetrics>(
+ new AbfsOperationMetrics(System.currentTimeMillis()));
+ timer.schedule(new TimerTaskImpl(),
+ analysisPeriodMs,
+ analysisPeriodMs);
+ isOperationOnAccountIdle.set(false);
+ }
+
+ /**
+ * Synchronized method to suspend or resume timer.
+ * @param timerFunctionality resume or suspend.
+ * @param timerTask The timertask object.
+ * @return true or false.
+ */
+ private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
+ TimerTask timerTask) {
+ switch (timerFunctionality) {
+ case RESUME:
+ if (isOperationOnAccountIdle.get()) {
+ resumeTimer();
+ }
+ break;
+ case SUSPEND:
+ if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
+ - lastExecutionTime.get() >= getOperationIdleTimeout())) {
+ isOperationOnAccountIdle.set(true);
+ timerTask.cancel();
+ timer.purge();
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+ return false;
+ }
+
/**
* Updates metrics with results from the current storage operation.
*
@@ -104,12 +144,13 @@ class AbfsClientThrottlingAnalyzer {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.addBytesFailed(count);
+ metrics.incrementOperationsFailed();
} else {
- metrics.bytesSuccessful.addAndGet(count);
- metrics.operationsSuccessful.incrementAndGet();
+ metrics.addBytesSuccessful(count);
+ metrics.incrementOperationsSuccessful();
}
+ blobMetrics.set(metrics);
}
/**
@@ -117,6 +158,8 @@ class AbfsClientThrottlingAnalyzer {
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
+ lastExecutionTime.set(now());
+ timerOrchestrator(TimerFunctionality.RESUME, null);
int duration = sleepDuration;
if (duration > 0) {
try {
@@ -134,19 +177,27 @@ class AbfsClientThrottlingAnalyzer {
return sleepDuration;
}
+ int getOperationIdleTimeout() {
+ return abfsConfiguration.getAccountOperationIdleTimeout();
+ }
+
+ AtomicBoolean getIsOperationOnAccountIdle() {
+ return isOperationOnAccountIdle;
+ }
+
private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
- double bytesFailed = metrics.bytesFailed.get();
- double bytesSuccessful = metrics.bytesSuccessful.get();
- double operationsFailed = metrics.operationsFailed.get();
- double operationsSuccessful = metrics.operationsSuccessful.get();
+ double bytesFailed = metrics.getBytesFailed().get();
+ double bytesSuccessful = metrics.getBytesSuccessful().get();
+ double operationsFailed = metrics.getOperationsFailed().get();
+ double operationsSuccessful = metrics.getOperationsSuccessful().get();
double errorPercentage = (bytesFailed <= 0)
? 0
: (percentageConversionFactor
* bytesFailed
/ (bytesFailed + bytesSuccessful));
- long periodMs = metrics.endTime - metrics.startTime;
+ long periodMs = metrics.getEndTime() - metrics.getStartTime();
double newSleepDuration;
@@ -238,10 +289,13 @@ class AbfsClientThrottlingAnalyzer {
}
long now = System.currentTimeMillis();
- if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+ if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
+ return;
+ }
+ if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
new AbfsOperationMetrics(now));
- oldMetrics.endTime = now;
+ oldMetrics.setEndTime(now);
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration);
}
@@ -252,24 +306,4 @@ class AbfsClientThrottlingAnalyzer {
}
}
}
-
- /**
- * Stores Abfs operation metrics during each analysis period.
- */
- static class AbfsOperationMetrics {
- private AtomicLong bytesFailed;
- private AtomicLong bytesSuccessful;
- private AtomicLong operationsFailed;
- private AtomicLong operationsSuccessful;
- private long endTime;
- private long startTime;
-
- AbfsOperationMetrics(long startTime) {
- this.startTime = startTime;
- this.bytesFailed = new AtomicLong();
- this.bytesSuccessful = new AtomicLong();
- this.operationsFailed = new AtomicLong();
- this.operationsSuccessful = new AtomicLong();
- }
- }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 7303e833418..52a46bc7469 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection;
+import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
@@ -38,35 +40,89 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
private static final String RANGE_PREFIX = "bytes=";
- private static AbfsClientThrottlingIntercept singleton = null;
- private AbfsClientThrottlingAnalyzer readThrottler = null;
- private AbfsClientThrottlingAnalyzer writeThrottler = null;
- private static boolean isAutoThrottlingEnabled = false;
+ private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
+ private static final ReentrantLock LOCK = new ReentrantLock();
+ private final AbfsClientThrottlingAnalyzer readThrottler;
+ private final AbfsClientThrottlingAnalyzer writeThrottler;
+ private final String accountName;
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ //Account name is kept as empty as same instance is shared across all accounts
+ this.accountName = "";
+ this.readThrottler = setAnalyzer("read", abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
+ }
+
+ /**
+ * Sets the analyzer for the intercept.
+ * @param name Name of the analyzer.
+ * @param abfsConfiguration The configuration.
+ * @return AbfsClientThrottlingAnalyzer instance.
+ */
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ /**
+ * Returns the analyzer for read operations.
+ * @return AbfsClientThrottlingAnalyzer for read.
+ */
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ /**
+ * Returns the analyzer for write operations.
+ * @return AbfsClientThrottlingAnalyzer for write.
+ */
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ /**
+ * Creates a singleton object of the AbfsClientThrottlingIntercept.
+ * which is shared across all filesystem instances.
+ * @param abfsConfiguration configuration set.
+ * @return singleton object of intercept.
+ */
+ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
if (singleton == null) {
- singleton = new AbfsClientThrottlingIntercept();
- isAutoThrottlingEnabled = true;
- LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ LOCK.lock();
+ try {
+ if (singleton == null) {
+ singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ }
+ } finally {
+ LOCK.unlock();
+ }
}
+ return singleton;
}
- static void updateMetrics(AbfsRestOperationType operationType,
- AbfsHttpOperation abfsHttpOperation) {
- if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+ /**
+ * Updates the metrics for successful and failed read and write operations.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsHttpOperation Used for status code and data transferred.
+ */
+ @Override
+ public void updateMetrics(AbfsRestOperationType operationType,
+ AbfsHttpOperation abfsHttpOperation) {
+ if (abfsHttpOperation == null) {
return;
}
@@ -82,7 +138,7 @@ public final class AbfsClientThrottlingIntercept {
case Append:
contentLength = abfsHttpOperation.getBytesSent();
if (contentLength > 0) {
- singleton.writeThrottler.addBytesTransferred(contentLength,
+ writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
@@ -90,7 +146,7 @@ public final class AbfsClientThrottlingIntercept {
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
contentLength = getContentLengthIfKnown(range);
if (contentLength > 0) {
- singleton.readThrottler.addBytesTransferred(contentLength,
+ readThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
@@ -104,21 +160,18 @@ public final class AbfsClientThrottlingIntercept {
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*/
- static void sendingRequest(AbfsRestOperationType operationType,
+ @Override
+ public void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters) {
- if (!isAutoThrottlingEnabled) {
- return;
- }
-
switch (operationType) {
case ReadFile:
- if (singleton.readThrottler.suspendIfNecessary()
+ if (readThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
}
break;
case Append:
- if (singleton.writeThrottler.suspendIfNecessary()
+ if (writeThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
new file mode 100644
index 00000000000..6b84e583c33
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {
+
+ public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept();
+
+ private AbfsNoOpThrottlingIntercept() {
+ }
+
+ @Override
+ public void updateMetrics(final AbfsRestOperationType operationType,
+ final AbfsHttpOperation abfsHttpOperation) {
+ }
+
+ @Override
+ public void sendingRequest(final AbfsRestOperationType operationType,
+ final AbfsCounters abfsCounters) {
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java
new file mode 100644
index 00000000000..2e53367d39f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Stores Abfs operation metrics during each analysis period.
+ */
+class AbfsOperationMetrics {
+
+ /**
+ * No of bytes which could not be transferred due to a failed operation.
+ */
+ private final AtomicLong bytesFailed;
+
+ /**
+ * No of bytes successfully transferred during a successful operation.
+ */
+ private final AtomicLong bytesSuccessful;
+
+ /**
+ * Total no of failed operations.
+ */
+ private final AtomicLong operationsFailed;
+
+ /**
+ * Total no of successful operations.
+ */
+ private final AtomicLong operationsSuccessful;
+
+ /**
+ * Time when collection of metrics ended.
+ */
+ private long endTime;
+
+ /**
+ * Time when the collection of metrics started.
+ */
+ private final long startTime;
+
+ AbfsOperationMetrics(long startTime) {
+ this.startTime = startTime;
+ this.bytesFailed = new AtomicLong();
+ this.bytesSuccessful = new AtomicLong();
+ this.operationsFailed = new AtomicLong();
+ this.operationsSuccessful = new AtomicLong();
+ }
+
+ /**
+ *
+ * @return bytes failed to transfer.
+ */
+ AtomicLong getBytesFailed() {
+ return bytesFailed;
+ }
+
+ /**
+ *
+ * @return bytes successfully transferred.
+ */
+ AtomicLong getBytesSuccessful() {
+ return bytesSuccessful;
+ }
+
+ /**
+ *
+ * @return no of operations failed.
+ */
+ AtomicLong getOperationsFailed() {
+ return operationsFailed;
+ }
+
+ /**
+ *
+ * @return no of successful operations.
+ */
+ AtomicLong getOperationsSuccessful() {
+ return operationsSuccessful;
+ }
+
+ /**
+ *
+ * @return end time of metric collection.
+ */
+ long getEndTime() {
+ return endTime;
+ }
+
+ /**
+ *
+ * @param endTime sets the end time.
+ */
+ void setEndTime(final long endTime) {
+ this.endTime = endTime;
+ }
+
+ /**
+ *
+ * @return start time of metric collection.
+ */
+ long getStartTime() {
+ return startTime;
+ }
+
+ void addBytesFailed(long bytes) {
+ this.getBytesFailed().addAndGet(bytes);
+ }
+
+ void addBytesSuccessful(long bytes) {
+ this.getBytesSuccessful().addAndGet(bytes);
+ }
+
+ void incrementOperationsFailed() {
+ this.getOperationsFailed().incrementAndGet();
+ }
+
+ void incrementOperationsSuccessful() {
+ this.getOperationsSuccessful().incrementAndGet();
+ }
+
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index c2a80f81770..2c669e9e101 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -45,6 +45,8 @@ public class AbfsRestOperation {
private final AbfsRestOperationType operationType;
// Blob FS client, which has the credentials, retry policy, and logs.
private final AbfsClient client;
+ // Return intercept instance
+ private final AbfsThrottlingIntercept intercept;
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
private final String method;
// full URL including query parameters
@@ -145,6 +147,7 @@ public class AbfsRestOperation {
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();
+ this.intercept = client.getIntercept();
}
/**
@@ -241,7 +244,8 @@ public class AbfsRestOperation {
*/
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
- AbfsHttpOperation httpOperation = null;
+ AbfsHttpOperation httpOperation;
+
try {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
@@ -278,8 +282,7 @@ public class AbfsRestOperation {
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
- AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
-
+ intercept.sendingRequest(operationType, abfsCounters);
if (hasRequestBody) {
// HttpUrlConnection requires
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
@@ -317,7 +320,7 @@ public class AbfsRestOperation {
return false;
} finally {
- AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
+ intercept.updateMetrics(operationType, httpOperation);
}
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
new file mode 100644
index 00000000000..57b5095bb32
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for Abfs Throttling Interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AbfsThrottlingIntercept {
+
+ /**
+ * Updates the metrics for successful and failed read and write operations.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsHttpOperation Used for status code and data transferred.
+ */
+ void updateMetrics(AbfsRestOperationType operationType,
+ AbfsHttpOperation abfsHttpOperation);
+
+ /**
+ * Called before the request is sent. Client-side throttling
+ * uses this to suspend the request, if necessary, to minimize errors and
+ * maximize throughput.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsCounters Used for counters.
+ */
+ void sendingRequest(AbfsRestOperationType operationType,
+ AbfsCounters abfsCounters);
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java
new file mode 100644
index 00000000000..279b7a318ca
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.util.WeakReferenceMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to get an instance of throttling intercept class per account.
+ */
+final class AbfsThrottlingInterceptFactory {
+
+ private AbfsThrottlingInterceptFactory() {
+ }
+
+ private static AbfsConfiguration abfsConfig;
+
+ /**
+ * List of references notified of loss.
+ */
+ private static List<String> lostReferences = new ArrayList<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsThrottlingInterceptFactory.class);
+
+ /**
+ * Map which stores instance of ThrottlingIntercept class per account.
+ */
+ private static WeakReferenceMap<String, AbfsThrottlingIntercept>
+ interceptMap = new WeakReferenceMap<>(
+ AbfsThrottlingInterceptFactory::factory,
+ AbfsThrottlingInterceptFactory::referenceLost);
+
+ /**
+ * Returns instance of throttling intercept.
+ * @param accountName Account name.
+ * @return instance of throttling intercept.
+ */
+ private static AbfsClientThrottlingIntercept factory(final String accountName) {
+ return new AbfsClientThrottlingIntercept(accountName, abfsConfig);
+ }
+
+ /**
+ * Reference lost callback.
+ * @param accountName key lost.
+ */
+ private static void referenceLost(String accountName) {
+ lostReferences.add(accountName);
+ }
+
+ /**
+ * Returns an instance of AbfsThrottlingIntercept.
+ *
+ * @param accountName The account for which we need instance of throttling intercept.
+ @param abfsConfiguration The object of abfsconfiguration class.
+ * @return Instance of AbfsThrottlingIntercept.
+ */
+ static synchronized AbfsThrottlingIntercept getInstance(String accountName,
+ AbfsConfiguration abfsConfiguration) {
+ abfsConfig = abfsConfiguration;
+ AbfsThrottlingIntercept intercept;
+ if (!abfsConfiguration.isAutoThrottlingEnabled()) {
+ return AbfsNoOpThrottlingIntercept.INSTANCE;
+ }
+ // If singleton is enabled use a static instance of the intercept class for all accounts
+ if (!abfsConfiguration.accountThrottlingEnabled()) {
+ intercept = AbfsClientThrottlingIntercept.initializeSingleton(
+ abfsConfiguration);
+ } else {
+ // Return the instance from the map
+ intercept = interceptMap.get(accountName);
+ if (intercept == null) {
+ intercept = new AbfsClientThrottlingIntercept(accountName,
+ abfsConfiguration);
+ interceptMap.put(accountName, intercept);
+ }
+ }
+ return intercept;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
new file mode 100644
index 00000000000..bf7da69ec49
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+public enum TimerFunctionality {
+ RESUME,
+
+ SUSPEND
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 35d36055604..31498df1790 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to
performance issues.
+
+### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
+
+`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or
+write) should be paused until no new request is made again. The default value for the same is 60 seconds.
+
### <a name="hnscheckconfigoptions"></a> HNS Check Options
Config `fs.azure.account.hns.enabled` provides an option to specify whether
the storage account is HNS enabled or not. In case the config is not provided,
@@ -877,6 +883,9 @@ when there are too many writes from the same process.
tuned with this config considering each queued request holds a buffer. Set
the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
+`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value
+for the same is 10 seconds.
+
### <a name="securityconfigoptions"></a> Security Options
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index 565eb38c4f7..9e40f22d231 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -24,6 +24,9 @@ package org.apache.hadoop.fs.azurebfs.constants;
public final class TestConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
+ public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name";
+ public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
+ public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 0a1dca7e7d8..08eb3adc926 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -306,6 +306,11 @@ public final class TestAbfsClient {
when(client.getAccessToken()).thenCallRealMethod();
when(client.getSharedKeyCredentials()).thenCallRealMethod();
when(client.createDefaultHeaders()).thenCallRealMethod();
+ when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
+ when(client.getIntercept()).thenReturn(
+ AbfsThrottlingInterceptFactory.getInstance(
+ abfsConfig.getAccountName().substring(0,
+ abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
// override baseurl
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
index 3f680e49930..22649cd190d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
@@ -18,9 +18,15 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ANALYSIS_PERIOD;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -33,6 +39,15 @@ public class TestAbfsClientThrottlingAnalyzer {
+ ANALYSIS_PERIOD / 10;
private static final long MEGABYTE = 1024 * 1024;
private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
+ private AbfsConfiguration abfsConfiguration;
+
+ public TestAbfsClientThrottlingAnalyzer() throws IOException, IllegalAccessException {
+ final Configuration configuration = new Configuration();
+ configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+ configuration.setInt(FS_AZURE_ANALYSIS_PERIOD, 1000);
+ this.abfsConfiguration = new AbfsConfiguration(configuration,
+ "dummy");
+ }
private void sleep(long milliseconds) {
try {
@@ -82,8 +97,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testNoMetricUpdatesThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
- "test",
- ANALYSIS_PERIOD);
+ "test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration());
@@ -96,8 +110,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testOnlySuccessThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
- "test",
- ANALYSIS_PERIOD);
+ "test", abfsConfiguration);
analyzer.addBytesTransferred(8 * MEGABYTE, false);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@@ -112,8 +125,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testOnlyErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
- "test",
- ANALYSIS_PERIOD);
+ "test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(4 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@@ -132,8 +144,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testSuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
- "test",
- ANALYSIS_PERIOD);
+ "test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true);
@@ -157,8 +168,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testManySuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
- "test",
- ANALYSIS_PERIOD);
+ "test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
final int numberOfRequests = 20;
for (int i = 0; i < numberOfRequests; i++) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
index 0f8dc55aa14..a1fc4e138d6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
@@ -18,13 +18,35 @@
package org.apache.hadoop.fs.azurebfs.services;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.mockito.Mockito;
+
+import java.net.URI;
import java.util.Random;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.junit.Assert;
import org.junit.Test;
@@ -41,6 +63,9 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
private final int noRetryCount = 0;
private final int retryCount = new Random().nextInt(maxRetryCount);
private final int retryCountBeyondMax = maxRetryCount + 1;
+ private static final String TEST_PATH = "/testfile";
+ private static final double MULTIPLYING_FACTOR = 1.5;
+ private static final int ANALYSIS_PERIOD = 10000;
public TestExponentialRetryPolicy() throws Exception {
@@ -67,6 +92,173 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testThrottlingIntercept() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ final Configuration configuration = new Configuration();
+ configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+ configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false);
+
+ // On disabling throttling AbfsNoOpThrottlingIntercept object is returned
+ AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+ "dummy.dfs.core.windows.net");
+ AbfsThrottlingIntercept intercept;
+ AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
+ intercept = abfsClient.getIntercept();
+ Assertions.assertThat(intercept)
+ .describedAs("AbfsNoOpThrottlingIntercept instance expected")
+ .isInstanceOf(AbfsNoOpThrottlingIntercept.class);
+
+ configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
+ configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true);
+ // On disabling throttling AbfsClientThrottlingIntercept object is returned
+ AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
+ "dummy1.dfs.core.windows.net");
+ AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
+ intercept = abfsClient1.getIntercept();
+ Assertions.assertThat(intercept)
+ .describedAs("AbfsClientThrottlingIntercept instance expected")
+ .isInstanceOf(AbfsClientThrottlingIntercept.class);
+ }
+
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.accountThrottlingEnabled()).thenReturn(false);
+
+ AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName1 != null && !accountName1.isEmpty());
+
+ AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances should return same value
+ Assertions.assertThat(instance1)
+ .describedAs(
+ "if singleton is enabled, for different accounts both the instances should return same value")
+ .isEqualTo(instance2);
+
+ when(configuration.accountThrottlingEnabled()).thenReturn(true);
+ AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should return different value
+ Assertions.assertThat(instance3)
+ .describedAs(
+ "iff singleton is not enabled, for different accounts instances should return different value")
+ .isNotEqualTo(instance4);
+
+ //if singleton is not enabled, for same accounts instances should return same value
+ Assertions.assertThat(instance3)
+ .describedAs(
+ "if singleton is not enabled, for same accounts instances should return same value")
+ .isEqualTo(instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration1 = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAutoThrottlingEnabled());
+ Assume.assumeTrue(configuration1.accountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept
+ = (AbfsClientThrottlingIntercept) client.getIntercept();
+ final byte[] b = new byte[2 * MIN_BUFFER_SIZE];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ try (FSDataOutputStream stream = fs.create(testPath)) {
+ stream.write(b);
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR);
+ Thread.sleep(sleepTime);
+
+ try (FSDataInputStream streamRead = fs.open(testPath)) {
+ streamRead.read(b);
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName1 != null && !accountName1.isEmpty());
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1;
+ URI defaultUri1 = null;
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ fs1.initialize(defaultUri1, getRawConfiguration());
+ AbfsClient client1 = fs1.getAbfsStore().getClient();
+ AbfsClientThrottlingIntercept accountIntercept1
+ = (AbfsClientThrottlingIntercept) client1.getIntercept();
+ try (FSDataOutputStream stream1 = fs1.create(testPath)) {
+ stream1.write(b);
+ }
+
+ //Verify the write analyzer for first account is idle but the read analyzer is not idle.
+ Assertions.assertThat(accountIntercept.getWriteThrottler()
+ .getIsOperationOnAccountIdle()
+ .get())
+ .describedAs("Write analyzer for first account should be idle the first time")
+ .isTrue();
+
+ Assertions.assertThat(
+ accountIntercept.getReadThrottler()
+ .getIsOperationOnAccountIdle()
+ .get())
+ .describedAs("Read analyzer for first account should not be idle")
+ .isFalse();
+
+ //Verify the write analyzer for second account is not idle.
+ Assertions.assertThat(
+ accountIntercept1.getWriteThrottler()
+ .getIsOperationOnAccountIdle()
+ .get())
+ .describedAs("Write analyzer for second account should not be idle")
+ .isFalse();
+
+ //Again perform an operation on the first account.
+ try (FSDataOutputStream stream2 = fs.create(testPath)) {
+ stream2.write(b);
+ }
+
+ //Verify the write analyzer on first account is not idle.
+ Assertions.assertThat(
+
+ accountIntercept.getWriteThrottler()
+ .getIsOperationOnAccountIdle()
+ .get())
+ .describedAs(
+ "Write analyzer for first account should not be idle second time")
+ .isFalse();
+ }
+
@Test
public void testAbfsConfigConstructor() throws Exception {
// Ensure we choose expected values that are not defaults
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org