You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/11/30 14:40:24 UTC

[hadoop] branch branch-3.3.5 updated: HADOOP-18457. ABFS: Support account level throttling (#5034)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new 98aac2a15ca HADOOP-18457. ABFS: Support account level throttling (#5034)
98aac2a15ca is described below

commit 98aac2a15cad462f5a24befa9a6141d22fa13f66
Author: Anmol Asrani <an...@gmail.com>
AuthorDate: Wed Nov 30 18:35:31 2022 +0530

    HADOOP-18457. ABFS: Support account level throttling (#5034)
    
    This allows  abfs request throttling to be shared across all
    abfs connections talking to containers belonging to the same abfs storage
    account -as that is the level at which IO throttling is applied.
    
    The option is enabled/disabled in the configuration option
    "fs.azure.account.throttling.enabled";
    The default is "true"
    
    Contributed by Anmol Asrani
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  24 +++
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |   2 -
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   3 +
 .../constants/FileSystemConfigurations.java        |   3 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  14 ++
 .../services/AbfsClientThrottlingAnalyzer.java     | 132 ++++++++------
 .../services/AbfsClientThrottlingIntercept.java    | 107 +++++++++---
 .../services/AbfsNoOpThrottlingIntercept.java      |  37 ++++
 .../fs/azurebfs/services/AbfsOperationMetrics.java | 139 +++++++++++++++
 .../fs/azurebfs/services/AbfsRestOperation.java    |  11 +-
 .../azurebfs/services/AbfsThrottlingIntercept.java |  49 ++++++
 .../services/AbfsThrottlingInterceptFactory.java   | 102 +++++++++++
 .../fs/azurebfs/services/TimerFunctionality.java   |  26 +++
 .../hadoop-azure/src/site/markdown/abfs.md         |   9 +
 .../azurebfs/constants/TestConfigurationKeys.java  |   3 +
 .../fs/azurebfs/services/TestAbfsClient.java       |   5 +
 .../services/TestAbfsClientThrottlingAnalyzer.java |  30 ++--
 .../services/TestExponentialRetryPolicy.java       | 192 +++++++++++++++++++++
 18 files changed, 796 insertions(+), 92 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 4c77d2e136d..5a31b10b514 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -117,6 +117,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
   private boolean optimizeFooterRead;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
+      DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
+  private boolean accountThrottlingEnabled;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
       MinValue = MIN_BUFFER_SIZE,
       MaxValue = MAX_BUFFER_SIZE,
@@ -260,6 +264,14 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
   private boolean enableAutoThrottling;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
+      DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
+  private int accountOperationIdleTimeout;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
+          DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
+  private int analysisPeriod;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
       MinValue = 0,
       DefaultValue = RATE_LIMIT_DEFAULT)
@@ -694,6 +706,10 @@ public class AbfsConfiguration{
     return this.azureAppendBlobDirs;
   }
 
+  public boolean accountThrottlingEnabled() {
+    return accountThrottlingEnabled;
+  }
+
   public String getAzureInfiniteLeaseDirs() {
     return this.azureInfiniteLeaseDirs;
   }
@@ -736,6 +752,14 @@ public class AbfsConfiguration{
     return this.enableAutoThrottling;
   }
 
+  public int getAccountOperationIdleTimeout() {
+    return accountOperationIdleTimeout;
+  }
+
+  public int getAnalysisPeriod() {
+    return analysisPeriod;
+  }
+
   public int getRateLimit() {
     return rateLimit;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index d0bdd9818db..21501d28f42 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -55,7 +55,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
 import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -225,7 +224,6 @@ public class AzureBlobFileSystem extends FileSystem
       }
     }
 
-    AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
     rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
     LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 0353f3e01ff..a59f76b6d0f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -38,6 +38,7 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
   public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
+  public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
 
   // Retry strategy defined by the user
   public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
@@ -116,6 +117,8 @@ public final class ConfigurationKeys {
   public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
   public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
   public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
+  public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
+  public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
   public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
   public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
   /** This config ensures that during create overwrite an existing file will be
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 097285bb48f..0ea2c929800 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -94,6 +94,9 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ENABLE_FLUSH = true;
   public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
   public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
+  public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
+  public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
+  public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
 
   public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
       = DelegatingSSLSocketFactory.SSLChannelMode.Default;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index fe2ea35f1df..9974979aeba 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -101,6 +101,7 @@ public class AbfsClient implements Closeable {
   private AccessTokenProvider tokenProvider;
   private SASTokenProvider sasTokenProvider;
   private final AbfsCounters abfsCounters;
+  private final AbfsThrottlingIntercept intercept;
 
   private final ListeningScheduledExecutorService executorService;
 
@@ -120,6 +121,7 @@ public class AbfsClient implements Closeable {
     this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
     this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
     this.authType = abfsConfiguration.getAuthType(accountName);
+    this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
 
     String encryptionKey = this.abfsConfiguration
         .getClientProvidedEncryptionKey();
@@ -216,6 +218,10 @@ public class AbfsClient implements Closeable {
     return sharedKeyCredentials;
   }
 
+  AbfsThrottlingIntercept getIntercept() {
+    return intercept;
+  }
+
   List<AbfsHttpHeader> createDefaultHeaders() {
     final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
     requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
@@ -1277,6 +1283,14 @@ public class AbfsClient implements Closeable {
     return abfsCounters;
   }
 
+  /**
+   * Getter for abfsConfiguration from AbfsClient.
+   * @return AbfsConfiguration instance
+   */
+  protected AbfsConfiguration getAbfsConfiguration() {
+    return abfsConfiguration;
+  }
+
   public int getNumLeaseThreads() {
     return abfsConfiguration.getNumLeaseThreads();
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index a55c924dd81..f1eb3a2a774 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -20,20 +20,23 @@ package org.apache.hadoop.fs.azurebfs.services;
 
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.util.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.util.Time.now;
+
 class AbfsClientThrottlingAnalyzer {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingAnalyzer.class);
-  private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
   private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
   private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
   private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
@@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer {
   private String name = null;
   private Timer timer = null;
   private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
+  private AtomicLong lastExecutionTime = null;
+  private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
+  private AbfsConfiguration abfsConfiguration = null;
+  private boolean accountLevelThrottlingEnabled = true;
 
   private AbfsClientThrottlingAnalyzer() {
     // hide default constructor
   }
 
-  /**
-   * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
-   * the specified name.
-   *
-   * @param name a name used to identify this instance.
-   * @throws IllegalArgumentException if name is null or empty.
-   */
-  AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
-    this(name, DEFAULT_ANALYSIS_PERIOD_MS);
-  }
-
   /**
    * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
    * the specified name and period.
    *
    * @param name   A name used to identify this instance.
-   * @param period The frequency, in milliseconds, at which metrics are
-   *               analyzed.
+   * @param abfsConfiguration The configuration set.
    * @throws IllegalArgumentException If name is null or empty.
    *                                  If period is less than 1000 or greater than 30000 milliseconds.
    */
-  AbfsClientThrottlingAnalyzer(String name, int period)
+  AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
       throws IllegalArgumentException {
     Preconditions.checkArgument(
         StringUtils.isNotEmpty(name),
         "The argument 'name' cannot be null or empty.");
+    int period = abfsConfiguration.getAnalysisPeriod();
     Preconditions.checkArgument(
         period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
         "The argument 'period' must be between 1000 and 30000.");
     this.name = name;
-    this.analysisPeriodMs = period;
+    this.abfsConfiguration = abfsConfiguration;
+    this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
+    this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
+    this.lastExecutionTime = new AtomicLong(now());
     this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
         new AbfsOperationMetrics(System.currentTimeMillis()));
     this.timer = new Timer(
@@ -95,6 +94,47 @@ class AbfsClientThrottlingAnalyzer {
         analysisPeriodMs);
   }
 
+  /**
+   * Resumes the timer if it was stopped.
+   */
+  private void resumeTimer() {
+    blobMetrics = new AtomicReference<AbfsOperationMetrics>(
+            new AbfsOperationMetrics(System.currentTimeMillis()));
+    timer.schedule(new TimerTaskImpl(),
+            analysisPeriodMs,
+            analysisPeriodMs);
+    isOperationOnAccountIdle.set(false);
+  }
+
+  /**
+   * Synchronized method to suspend or resume timer.
+   * @param timerFunctionality resume or suspend.
+   * @param timerTask The timertask object.
+   * @return true or false.
+   */
+  private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
+      TimerTask timerTask) {
+    switch (timerFunctionality) {
+    case RESUME:
+      if (isOperationOnAccountIdle.get()) {
+        resumeTimer();
+      }
+      break;
+    case SUSPEND:
+      if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
+          - lastExecutionTime.get() >= getOperationIdleTimeout())) {
+        isOperationOnAccountIdle.set(true);
+        timerTask.cancel();
+        timer.purge();
+        return true;
+      }
+      break;
+    default:
+      break;
+    }
+    return false;
+  }
+
   /**
    * Updates metrics with results from the current storage operation.
    *
@@ -104,12 +144,13 @@ class AbfsClientThrottlingAnalyzer {
   public void addBytesTransferred(long count, boolean isFailedOperation) {
     AbfsOperationMetrics metrics = blobMetrics.get();
     if (isFailedOperation) {
-      metrics.bytesFailed.addAndGet(count);
-      metrics.operationsFailed.incrementAndGet();
+      metrics.addBytesFailed(count);
+      metrics.incrementOperationsFailed();
     } else {
-      metrics.bytesSuccessful.addAndGet(count);
-      metrics.operationsSuccessful.incrementAndGet();
+      metrics.addBytesSuccessful(count);
+      metrics.incrementOperationsSuccessful();
     }
+    blobMetrics.set(metrics);
   }
 
   /**
@@ -117,6 +158,8 @@ class AbfsClientThrottlingAnalyzer {
    * @return true if Thread sleeps(Throttling occurs) else false.
    */
   public boolean suspendIfNecessary() {
+    lastExecutionTime.set(now());
+    timerOrchestrator(TimerFunctionality.RESUME, null);
     int duration = sleepDuration;
     if (duration > 0) {
       try {
@@ -134,19 +177,27 @@ class AbfsClientThrottlingAnalyzer {
     return sleepDuration;
   }
 
+  int getOperationIdleTimeout() {
+    return abfsConfiguration.getAccountOperationIdleTimeout();
+  }
+
+  AtomicBoolean getIsOperationOnAccountIdle() {
+    return isOperationOnAccountIdle;
+  }
+
   private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
                                                    int sleepDuration) {
     final double percentageConversionFactor = 100;
-    double bytesFailed = metrics.bytesFailed.get();
-    double bytesSuccessful = metrics.bytesSuccessful.get();
-    double operationsFailed = metrics.operationsFailed.get();
-    double operationsSuccessful = metrics.operationsSuccessful.get();
+    double bytesFailed = metrics.getBytesFailed().get();
+    double bytesSuccessful = metrics.getBytesSuccessful().get();
+    double operationsFailed = metrics.getOperationsFailed().get();
+    double operationsSuccessful = metrics.getOperationsSuccessful().get();
     double errorPercentage = (bytesFailed <= 0)
         ? 0
         : (percentageConversionFactor
         * bytesFailed
         / (bytesFailed + bytesSuccessful));
-    long periodMs = metrics.endTime - metrics.startTime;
+    long periodMs = metrics.getEndTime() - metrics.getStartTime();
 
     double newSleepDuration;
 
@@ -238,10 +289,13 @@ class AbfsClientThrottlingAnalyzer {
         }
 
         long now = System.currentTimeMillis();
-        if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+        if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
+          return;
+        }
+        if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
           AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
               new AbfsOperationMetrics(now));
-          oldMetrics.endTime = now;
+          oldMetrics.setEndTime(now);
           sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
               sleepDuration);
         }
@@ -252,24 +306,4 @@ class AbfsClientThrottlingAnalyzer {
       }
     }
   }
-
-  /**
-   * Stores Abfs operation metrics during each analysis period.
-   */
-  static class AbfsOperationMetrics {
-    private AtomicLong bytesFailed;
-    private AtomicLong bytesSuccessful;
-    private AtomicLong operationsFailed;
-    private AtomicLong operationsSuccessful;
-    private long endTime;
-    private long startTime;
-
-    AbfsOperationMetrics(long startTime) {
-      this.startTime = startTime;
-      this.bytesFailed = new AtomicLong();
-      this.bytesSuccessful = new AtomicLong();
-      this.operationsFailed = new AtomicLong();
-      this.operationsSuccessful = new AtomicLong();
-    }
-  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 7303e833418..52a46bc7469 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.net.HttpURLConnection;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 
@@ -38,35 +40,89 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+        }
+      } finally {
+        LOCK.unlock();
+      }
     }
+    return singleton;
   }
 
-  static void updateMetrics(AbfsRestOperationType operationType,
-                            AbfsHttpOperation abfsHttpOperation) {
-    if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+  /**
+   * Updates the metrics for successful and failed read and write operations.
+   * @param operationType Only applicable for read and write operations.
+   * @param abfsHttpOperation Used for status code and data transferred.
+   */
+  @Override
+  public void updateMetrics(AbfsRestOperationType operationType,
+      AbfsHttpOperation abfsHttpOperation) {
+    if (abfsHttpOperation == null) {
       return;
     }
 
@@ -82,7 +138,7 @@ public final class AbfsClientThrottlingIntercept {
       case Append:
         contentLength = abfsHttpOperation.getBytesSent();
         if (contentLength > 0) {
-          singleton.writeThrottler.addBytesTransferred(contentLength,
+          writeThrottler.addBytesTransferred(contentLength,
               isFailedOperation);
         }
         break;
@@ -90,7 +146,7 @@ public final class AbfsClientThrottlingIntercept {
         String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
         contentLength = getContentLengthIfKnown(range);
         if (contentLength > 0) {
-          singleton.readThrottler.addBytesTransferred(contentLength,
+          readThrottler.addBytesTransferred(contentLength,
               isFailedOperation);
         }
         break;
@@ -104,21 +160,18 @@ public final class AbfsClientThrottlingIntercept {
    * uses this to suspend the request, if necessary, to minimize errors and
    * maximize throughput.
    */
-  static void sendingRequest(AbfsRestOperationType operationType,
+  @Override
+  public void sendingRequest(AbfsRestOperationType operationType,
       AbfsCounters abfsCounters) {
-    if (!isAutoThrottlingEnabled) {
-      return;
-    }
-
     switch (operationType) {
       case ReadFile:
-        if (singleton.readThrottler.suspendIfNecessary()
+        if (readThrottler.suspendIfNecessary()
             && abfsCounters != null) {
           abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
         }
         break;
       case Append:
-        if (singleton.writeThrottler.suspendIfNecessary()
+        if (writeThrottler.suspendIfNecessary()
             && abfsCounters != null) {
           abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
         }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
new file mode 100644
index 00000000000..6b84e583c33
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {
+
+  public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept();
+
+  private AbfsNoOpThrottlingIntercept() {
+  }
+
+  @Override
+  public void updateMetrics(final AbfsRestOperationType operationType,
+      final AbfsHttpOperation abfsHttpOperation) {
+  }
+
+  @Override
+  public void sendingRequest(final AbfsRestOperationType operationType,
+      final AbfsCounters abfsCounters) {
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java
new file mode 100644
index 00000000000..2e53367d39f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Stores Abfs operation metrics during each analysis period.
+ */
+class AbfsOperationMetrics {
+
+  /**
+   * No of bytes which could not be transferred due to a failed operation.
+  */
+  private final AtomicLong bytesFailed;
+
+  /**
+   * No of bytes successfully transferred during a successful operation.
+   */
+  private final AtomicLong bytesSuccessful;
+
+  /**
+   * Total no of failed operations.
+   */
+  private final AtomicLong operationsFailed;
+
+  /**
+   * Total no of successful operations.
+   */
+  private final AtomicLong operationsSuccessful;
+
+  /**
+   * Time when collection of metrics ended.
+   */
+  private long endTime;
+
+  /**
+   * Time when the collection of metrics started.
+   */
+  private final long startTime;
+
+  AbfsOperationMetrics(long startTime) {
+    this.startTime = startTime;
+    this.bytesFailed = new AtomicLong();
+    this.bytesSuccessful = new AtomicLong();
+    this.operationsFailed = new AtomicLong();
+    this.operationsSuccessful = new AtomicLong();
+  }
+
+  /**
+   *
+   * @return bytes failed to transfer.
+   */
+  AtomicLong getBytesFailed() {
+    return bytesFailed;
+  }
+
+  /**
+   *
+   * @return bytes successfully transferred.
+   */
+  AtomicLong getBytesSuccessful() {
+    return bytesSuccessful;
+  }
+
+  /**
+   *
+   * @return no of operations failed.
+   */
+  AtomicLong getOperationsFailed() {
+    return operationsFailed;
+  }
+
+  /**
+   *
+   * @return no of successful operations.
+   */
+  AtomicLong getOperationsSuccessful() {
+    return operationsSuccessful;
+  }
+
+  /**
+   *
+   * @return end time of metric collection.
+   */
+  long getEndTime() {
+    return endTime;
+  }
+
+  /**
+   *
+   * @param endTime sets the end time.
+   */
+  void setEndTime(final long endTime) {
+    this.endTime = endTime;
+  }
+
+  /**
+   *
+   * @return start time of metric collection.
+   */
+  long getStartTime() {
+    return startTime;
+  }
+
+  void addBytesFailed(long bytes) {
+    this.getBytesFailed().addAndGet(bytes);
+  }
+
+  void addBytesSuccessful(long bytes) {
+    this.getBytesSuccessful().addAndGet(bytes);
+  }
+
+  void incrementOperationsFailed() {
+    this.getOperationsFailed().incrementAndGet();
+  }
+
+  void incrementOperationsSuccessful() {
+    this.getOperationsSuccessful().incrementAndGet();
+  }
+
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index c2a80f81770..2c669e9e101 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -45,6 +45,8 @@ public class AbfsRestOperation {
   private final AbfsRestOperationType operationType;
   // Blob FS client, which has the credentials, retry policy, and logs.
   private final AbfsClient client;
+  // Return intercept instance
+  private final AbfsThrottlingIntercept intercept;
   // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
   private final String method;
   // full URL including query parameters
@@ -145,6 +147,7 @@ public class AbfsRestOperation {
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
     this.sasToken = sasToken;
     this.abfsCounters = client.getAbfsCounters();
+    this.intercept = client.getIntercept();
   }
 
   /**
@@ -241,7 +244,8 @@ public class AbfsRestOperation {
    */
   private boolean executeHttpOperation(final int retryCount,
     TracingContext tracingContext) throws AzureBlobFileSystemException {
-    AbfsHttpOperation httpOperation = null;
+    AbfsHttpOperation httpOperation;
+
     try {
       // initialize the HTTP request and open the connection
       httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
@@ -278,8 +282,7 @@ public class AbfsRestOperation {
       // dump the headers
       AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
           httpOperation.getConnection().getRequestProperties());
-      AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
-
+      intercept.sendingRequest(operationType, abfsCounters);
       if (hasRequestBody) {
         // HttpUrlConnection requires
         httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
@@ -317,7 +320,7 @@ public class AbfsRestOperation {
 
       return false;
     } finally {
-      AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
+      intercept.updateMetrics(operationType, httpOperation);
     }
 
     LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
new file mode 100644
index 00000000000..57b5095bb32
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for Abfs Throttling Interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AbfsThrottlingIntercept {
+
+  /**
+   * Updates the metrics for successful and failed read and write operations.
+   * @param operationType Only applicable for read and write operations.
+   * @param abfsHttpOperation Used for status code and data transferred.
+   */
+  void updateMetrics(AbfsRestOperationType operationType,
+      AbfsHttpOperation abfsHttpOperation);
+
+  /**
+   * Called before the request is sent.  Client-side throttling
+   * uses this to suspend the request, if necessary, to minimize errors and
+   * maximize throughput.
+   * @param operationType Only applicable for read and write operations.
+   * @param abfsCounters Used for counters.
+   */
+  void sendingRequest(AbfsRestOperationType operationType,
+      AbfsCounters abfsCounters);
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java
new file mode 100644
index 00000000000..279b7a318ca
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.util.WeakReferenceMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to get an instance of throttling intercept class per account.
+ */
+final class AbfsThrottlingInterceptFactory {
+
+  private AbfsThrottlingInterceptFactory() {
+  }
+
+  private static AbfsConfiguration abfsConfig;
+
+  /**
+   * List of references notified of loss.
+   */
+  private static List<String> lostReferences = new ArrayList<>();
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsThrottlingInterceptFactory.class);
+
+  /**
+   * Map which stores instance of ThrottlingIntercept class per account.
+   */
+  private static WeakReferenceMap<String, AbfsThrottlingIntercept>
+      interceptMap = new WeakReferenceMap<>(
+      AbfsThrottlingInterceptFactory::factory,
+      AbfsThrottlingInterceptFactory::referenceLost);
+
+  /**
+   * Returns instance of throttling intercept.
+   * @param accountName Account name.
+   * @return instance of throttling intercept.
+   */
+  private static AbfsClientThrottlingIntercept factory(final String accountName) {
+    return new AbfsClientThrottlingIntercept(accountName, abfsConfig);
+  }
+
+  /**
+   * Reference lost callback.
+   * @param accountName key lost.
+   */
+  private static void referenceLost(String accountName) {
+    lostReferences.add(accountName);
+  }
+
+  /**
+   * Returns an instance of AbfsThrottlingIntercept.
+   *
+   * @param accountName The account for which we need instance of throttling intercept.
+     @param abfsConfiguration The object of abfsconfiguration class.
+    * @return Instance of AbfsThrottlingIntercept.
+   */
+  static synchronized AbfsThrottlingIntercept getInstance(String accountName,
+      AbfsConfiguration abfsConfiguration) {
+    abfsConfig =  abfsConfiguration;
+    AbfsThrottlingIntercept intercept;
+    if (!abfsConfiguration.isAutoThrottlingEnabled()) {
+      return AbfsNoOpThrottlingIntercept.INSTANCE;
+    }
+    // If singleton is enabled use a static instance of the intercept class for all accounts
+    if (!abfsConfiguration.accountThrottlingEnabled()) {
+      intercept = AbfsClientThrottlingIntercept.initializeSingleton(
+          abfsConfiguration);
+    } else {
+      // Return the instance from the map
+      intercept = interceptMap.get(accountName);
+      if (intercept == null) {
+        intercept = new AbfsClientThrottlingIntercept(accountName,
+            abfsConfiguration);
+        interceptMap.put(accountName, intercept);
+      }
+    }
+    return intercept;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
new file mode 100644
index 00000000000..bf7da69ec49
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+public enum TimerFunctionality {
+  RESUME,
+
+  SUSPEND
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 35d36055604..31498df1790 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data
 transfer, Flush() also attempting to persist buffered data will lead to
 performance issues.
 
+
+### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
+
+`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or
+write) should be paused until no new request is made again. The default value for the same is 60 seconds.
+
 ### <a name="hnscheckconfigoptions"></a> HNS Check Options
 Config `fs.azure.account.hns.enabled` provides an option to specify whether
  the storage account is HNS enabled or not. In case the config is not provided,
@@ -877,6 +883,9 @@ when there are too many writes from the same process.
  tuned with this config considering each queued request holds a buffer. Set
  the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
 
+`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value
+for the same is 10 seconds.
+
 ### <a name="securityconfigoptions"></a> Security Options
 `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
 is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index 565eb38c4f7..9e40f22d231 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -24,6 +24,9 @@ package org.apache.hadoop.fs.azurebfs.constants;
 public final class TestConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
   public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
+  public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name";
+  public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
+  public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
   public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
   public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
   public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 0a1dca7e7d8..08eb3adc926 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -306,6 +306,11 @@ public final class TestAbfsClient {
     when(client.getAccessToken()).thenCallRealMethod();
     when(client.getSharedKeyCredentials()).thenCallRealMethod();
     when(client.createDefaultHeaders()).thenCallRealMethod();
+    when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
+    when(client.getIntercept()).thenReturn(
+        AbfsThrottlingInterceptFactory.getInstance(
+            abfsConfig.getAccountName().substring(0,
+                abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
 
     // override baseurl
     client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
index 3f680e49930..22649cd190d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.junit.Test;
 
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ANALYSIS_PERIOD;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -33,6 +39,15 @@ public class TestAbfsClientThrottlingAnalyzer {
       + ANALYSIS_PERIOD / 10;
   private static final long MEGABYTE = 1024 * 1024;
   private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
+  private AbfsConfiguration abfsConfiguration;
+
+  public TestAbfsClientThrottlingAnalyzer() throws IOException, IllegalAccessException {
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    configuration.setInt(FS_AZURE_ANALYSIS_PERIOD, 1000);
+    this.abfsConfiguration = new AbfsConfiguration(configuration,
+            "dummy");
+  }
 
   private void sleep(long milliseconds) {
     try {
@@ -82,8 +97,7 @@ public class TestAbfsClientThrottlingAnalyzer {
   @Test
   public void testNoMetricUpdatesThenNoWaiting() {
     AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
-        "test",
-        ANALYSIS_PERIOD);
+        "test", abfsConfiguration);
     validate(0, analyzer.getSleepDuration());
     sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
     validate(0, analyzer.getSleepDuration());
@@ -96,8 +110,7 @@ public class TestAbfsClientThrottlingAnalyzer {
   @Test
   public void testOnlySuccessThenNoWaiting() {
     AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
-        "test",
-        ANALYSIS_PERIOD);
+        "test", abfsConfiguration);
     analyzer.addBytesTransferred(8 * MEGABYTE, false);
     validate(0, analyzer.getSleepDuration());
     sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@@ -112,8 +125,7 @@ public class TestAbfsClientThrottlingAnalyzer {
   @Test
   public void testOnlyErrorsAndWaiting() {
     AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
-        "test",
-        ANALYSIS_PERIOD);
+        "test", abfsConfiguration);
     validate(0, analyzer.getSleepDuration());
     analyzer.addBytesTransferred(4 * MEGABYTE, true);
     sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@@ -132,8 +144,7 @@ public class TestAbfsClientThrottlingAnalyzer {
   @Test
   public void testSuccessAndErrorsAndWaiting() {
     AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
-        "test",
-        ANALYSIS_PERIOD);
+        "test", abfsConfiguration);
     validate(0, analyzer.getSleepDuration());
     analyzer.addBytesTransferred(8 * MEGABYTE, false);
     analyzer.addBytesTransferred(2 * MEGABYTE, true);
@@ -157,8 +168,7 @@ public class TestAbfsClientThrottlingAnalyzer {
   @Test
   public void testManySuccessAndErrorsAndWaiting() {
     AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
-        "test",
-        ANALYSIS_PERIOD);
+        "test", abfsConfiguration);
     validate(0, analyzer.getSleepDuration());
     final int numberOfRequests = 20;
     for (int i = 0; i < numberOfRequests; i++) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
index 0f8dc55aa14..a1fc4e138d6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
@@ -18,13 +18,35 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.fs.FSDataInputStream;
 
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.mockito.Mockito;
+
+import java.net.URI;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -41,6 +63,9 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
   private final int noRetryCount = 0;
   private final int retryCount = new Random().nextInt(maxRetryCount);
   private final int retryCountBeyondMax = maxRetryCount + 1;
+  private static final String TEST_PATH = "/testfile";
+  private static final double MULTIPLYING_FACTOR = 1.5;
+  private static final int ANALYSIS_PERIOD = 10000;
 
 
   public TestExponentialRetryPolicy() throws Exception {
@@ -67,6 +92,173 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
     testMaxIOConfig(abfsConfig);
   }
 
+  @Test
+  public void testThrottlingIntercept() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false);
+
+    // On disabling throttling AbfsNoOpThrottlingIntercept object is returned
+    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+        "dummy.dfs.core.windows.net");
+    AbfsThrottlingIntercept intercept;
+    AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
+    intercept = abfsClient.getIntercept();
+    Assertions.assertThat(intercept)
+        .describedAs("AbfsNoOpThrottlingIntercept instance expected")
+        .isInstanceOf(AbfsNoOpThrottlingIntercept.class);
+
+    configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
+    configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true);
+    // On disabling throttling AbfsClientThrottlingIntercept object is returned
+    AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
+        "dummy1.dfs.core.windows.net");
+    AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
+    intercept = abfsClient1.getIntercept();
+    Assertions.assertThat(intercept)
+        .describedAs("AbfsClientThrottlingIntercept instance expected")
+        .isInstanceOf(AbfsClientThrottlingIntercept.class);
+  }
+
+  @Test
+  public void testCreateMultipleAccountThrottling() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+    if (accountName == null) {
+      // check if accountName is set using different config key
+      accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    }
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName != null && !accountName.isEmpty());
+
+    Configuration rawConfig1 = new Configuration();
+    rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+    AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+    when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+    when(successOp.getResult()).thenReturn(http500Op);
+
+    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+    when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD);
+    when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+    when(configuration.accountThrottlingEnabled()).thenReturn(false);
+
+    AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName1 != null && !accountName1.isEmpty());
+
+    AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    //if singleton is enabled, for different accounts both the instances should return same value
+    Assertions.assertThat(instance1)
+        .describedAs(
+            "if singleton is enabled, for different accounts both the instances should return same value")
+        .isEqualTo(instance2);
+
+    when(configuration.accountThrottlingEnabled()).thenReturn(true);
+    AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
+    AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
+    //if singleton is not enabled, for different accounts instances should return different value
+    Assertions.assertThat(instance3)
+        .describedAs(
+            "iff singleton is not enabled, for different accounts instances should return different value")
+        .isNotEqualTo(instance4);
+
+    //if singleton is not enabled, for same accounts instances should return same value
+    Assertions.assertThat(instance3)
+        .describedAs(
+            "if singleton is not enabled, for same accounts instances should return same value")
+        .isEqualTo(instance5);
+  }
+
+  @Test
+  public void testOperationOnAccountIdle() throws Exception {
+    //Get the filesystem.
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration1 = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration1.isAutoThrottlingEnabled());
+    Assume.assumeTrue(configuration1.accountThrottlingEnabled());
+
+    AbfsClientThrottlingIntercept accountIntercept
+        = (AbfsClientThrottlingIntercept) client.getIntercept();
+    final byte[] b = new byte[2 * MIN_BUFFER_SIZE];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+
+    //Do an operation on the filesystem.
+    try (FSDataOutputStream stream = fs.create(testPath)) {
+      stream.write(b);
+    }
+
+    //Don't perform any operation on the account.
+    int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR);
+    Thread.sleep(sleepTime);
+
+    try (FSDataInputStream streamRead = fs.open(testPath)) {
+      streamRead.read(b);
+    }
+
+    //Perform operations on another account.
+    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName1 != null && !accountName1.isEmpty());
+    final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1;
+    URI defaultUri1 = null;
+    defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+    fs1.initialize(defaultUri1, getRawConfiguration());
+    AbfsClient client1 = fs1.getAbfsStore().getClient();
+    AbfsClientThrottlingIntercept accountIntercept1
+        = (AbfsClientThrottlingIntercept) client1.getIntercept();
+    try (FSDataOutputStream stream1 = fs1.create(testPath)) {
+      stream1.write(b);
+    }
+
+    //Verify the write analyzer for first account is idle but the read analyzer is not idle.
+    Assertions.assertThat(accountIntercept.getWriteThrottler()
+            .getIsOperationOnAccountIdle()
+            .get())
+        .describedAs("Write analyzer for first account should be idle the first time")
+        .isTrue();
+
+    Assertions.assertThat(
+            accountIntercept.getReadThrottler()
+                .getIsOperationOnAccountIdle()
+                .get())
+        .describedAs("Read analyzer for first account should not be idle")
+        .isFalse();
+
+    //Verify the write analyzer for second account is not idle.
+    Assertions.assertThat(
+            accountIntercept1.getWriteThrottler()
+                .getIsOperationOnAccountIdle()
+                .get())
+        .describedAs("Write analyzer for second account should not be idle")
+        .isFalse();
+
+    //Again perform an operation on the first account.
+    try (FSDataOutputStream stream2 = fs.create(testPath)) {
+      stream2.write(b);
+    }
+
+    //Verify the write analyzer on first account is not idle.
+    Assertions.assertThat(
+
+            accountIntercept.getWriteThrottler()
+                .getIsOperationOnAccountIdle()
+                .get())
+        .describedAs(
+            "Write analyzer for first account should not be idle second time")
+        .isFalse();
+  }
+
   @Test
   public void testAbfsConfigConstructor() throws Exception {
     // Ensure we choose expected values that are not defaults


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org