You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2018/09/23 03:24:33 UTC

[36/45] hadoop git commit: HADOOP-15703. ABFS - Implement client-side throttling. Contributed by Sneha Varma and Thomas Marquardt.

HADOOP-15703. ABFS - Implement client-side throttling.
Contributed by Sneha Varma and Thomas Marquardt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/97f06b3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/97f06b3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/97f06b3f

Branch: refs/heads/trunk
Commit: 97f06b3fc70ad509e601076c015bc244daa1243f
Parents: 4410eac
Author: Thomas Marquardt <tm...@microsoft.com>
Authored: Mon Sep 3 01:37:10 2018 +0000
Committer: Thomas Marquardt <tm...@microsoft.com>
Committed: Mon Sep 17 19:54:01 2018 +0000

----------------------------------------------------------------------
 .../fs/azure/ClientThrottlingAnalyzer.java      |   2 +-
 .../hadoop/fs/azurebfs/AbfsConfiguration.java   |   8 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java |   3 +
 .../azurebfs/constants/ConfigurationKeys.java   |   2 +-
 .../constants/FileSystemConfigurations.java     |   1 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java |  17 ++
 .../services/AbfsClientThrottlingAnalyzer.java  | 272 +++++++++++++++++++
 .../services/AbfsClientThrottlingIntercept.java | 117 ++++++++
 .../fs/azurebfs/services/AbfsRestOperation.java |  15 +-
 .../services/AbfsRestOperationType.java         |  42 +++
 .../TestAbfsClientThrottlingAnalyzer.java       | 159 +++++++++++
 11 files changed, 633 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
index 850e552..859a608 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
@@ -99,7 +99,7 @@ class ClientThrottlingAnalyzer {
     this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
         new BlobOperationMetrics(System.currentTimeMillis()));
     this.timer = new Timer(
-        String.format("wasb-timer-client-throttling-analyzer-%s", name));
+        String.format("wasb-timer-client-throttling-analyzer-%s", name), true);
     this.timer.schedule(new TimerTaskImpl(),
         analysisPeriodMs,
         analysisPeriodMs);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 924bc3e..518fef9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -141,6 +141,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_FLUSH)
   private boolean enableFlush;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
+      DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
+  private boolean enableAutoThrottling;
+
   @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
       DefaultValue = "")
   private String userAgentId;
@@ -279,6 +283,10 @@ public class AbfsConfiguration{
     return this.enableFlush;
   }
 
+  public boolean isAutoThrottlingEnabled() {
+    return this.enableAutoThrottling;
+  }
+
   public String getCustomUserAgentPrefix() {
     return this.userAgentId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index b809192..c0ecc35 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Future;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,6 +131,8 @@ public class AzureBlobFileSystem extends FileSystem {
         this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager();
       }
     }
+    
+    AbfsClientThrottlingIntercept.initializeSingleton(abfsStore.getAbfsConfiguration().isAutoThrottlingEnabled());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index ca4c9c3..5236719 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -47,7 +47,7 @@ public final class ConfigurationKeys {
   public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
   public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
   public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
-  public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable";
+  public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
   public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
   public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
   public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a921faf..a9412a9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -57,6 +57,7 @@ public final class FileSystemConfigurations {
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
   public static final boolean DEFAULT_ENABLE_FLUSH = true;
+  public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
 
   public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
       = SSLSocketFactoryEx.SSLChannelMode.Default;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 18773b6..258045a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -125,6 +125,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.CreateFileSystem,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -148,6 +149,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.SetFileSystemProperties,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -170,6 +172,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.ListPaths,
             this,
             HTTP_METHOD_GET,
             url,
@@ -186,6 +189,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.GetFileSystemProperties,
             this,
             HTTP_METHOD_HEAD,
             url,
@@ -202,6 +206,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.DeleteFileSystem,
             this,
             HTTP_METHOD_DELETE,
             url,
@@ -230,6 +235,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.CreatePath,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -251,6 +257,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.RenamePath,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -273,6 +280,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.Append,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -296,6 +304,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.Flush,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -319,6 +328,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.SetPathProperties,
             this,
             HTTP_METHOD_PUT,
             url,
@@ -334,6 +344,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.GetPathProperties,
             this,
             HTTP_METHOD_HEAD,
             url,
@@ -354,6 +365,7 @@ public class AbfsClient {
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
 
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.ReadFile,
             this,
             HTTP_METHOD_GET,
             url,
@@ -376,6 +388,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+            AbfsRestOperationType.DeletePath,
             this,
             HTTP_METHOD_DELETE,
             url,
@@ -404,6 +417,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.SetOwner,
         this,
         AbfsHttpConstants.HTTP_METHOD_PUT,
         url,
@@ -427,6 +441,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.SetPermissions,
         this,
         AbfsHttpConstants.HTTP_METHOD_PUT,
         url,
@@ -458,6 +473,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.SetAcl,
         this,
         AbfsHttpConstants.HTTP_METHOD_PUT,
         url,
@@ -474,6 +490,7 @@ public class AbfsClient {
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.GetAcl,
         this,
         AbfsHttpConstants.HTTP_METHOD_HEAD,
         url,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..f1e5aaa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AbfsClientThrottlingAnalyzer {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsClientThrottlingAnalyzer.class);
+  private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
+  private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
+  private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
+  private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
+  private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
+  private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
+  private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
+      * 1000;
+  private static final double SLEEP_DECREASE_FACTOR = .975;
+  private static final double SLEEP_INCREASE_FACTOR = 1.05;
+  private int analysisPeriodMs;
+
+  private volatile int sleepDuration = 0;
+  private long consecutiveNoErrorCount = 0;
+  private String name = null;
+  private Timer timer = null;
+  private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
+
+  private AbfsClientThrottlingAnalyzer() {
+    // hide default constructor
+  }
+
+  /**
+   * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
+   * the specified name.
+   *
+   * @param name a name used to identify this instance.
+   * @throws IllegalArgumentException if name is null or empty.
+   */
+  AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
+    this(name, DEFAULT_ANALYSIS_PERIOD_MS);
+  }
+
+  /**
+   * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
+   * the specified name and period.
+   *
+   * @param name   A name used to identify this instance.
+   * @param period The frequency, in milliseconds, at which metrics are
+   *               analyzed.
+   * @throws IllegalArgumentException If name is null or empty.
+   *                                  If period is less than 1000 or greater than 30000 milliseconds.
+   */
+  AbfsClientThrottlingAnalyzer(String name, int period)
+      throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        StringUtils.isNotEmpty(name),
+        "The argument 'name' cannot be null or empty.");
+    Preconditions.checkArgument(
+        period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
+        "The argument 'period' must be between 1000 and 30000.");
+    this.name = name;
+    this.analysisPeriodMs = period;
+    this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
+        new AbfsOperationMetrics(System.currentTimeMillis()));
+    this.timer = new Timer(
+        String.format("abfs-timer-client-throttling-analyzer-%s", name), true);
+    this.timer.schedule(new TimerTaskImpl(),
+        analysisPeriodMs,
+        analysisPeriodMs);
+  }
+
+  /**
+   * Updates metrics with results from the current storage operation.
+   *
+   * @param count             The count of bytes transferred.
+   * @param isFailedOperation True if the operation failed; otherwise false.
+   */
+  public void addBytesTransferred(long count, boolean isFailedOperation) {
+    AbfsOperationMetrics metrics = blobMetrics.get();
+    if (isFailedOperation) {
+      metrics.bytesFailed.addAndGet(count);
+      metrics.operationsFailed.incrementAndGet();
+    } else {
+      metrics.bytesSuccessful.addAndGet(count);
+      metrics.operationsSuccessful.incrementAndGet();
+    }
+  }
+
+  /**
+   * Suspends the current storage operation, as necessary, to reduce throughput.
+   */
+  public void suspendIfNecessary() {
+    int duration = sleepDuration;
+    if (duration > 0) {
+      try {
+        Thread.sleep(duration);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  int getSleepDuration() {
+    return sleepDuration;
+  }
+
+  private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
+                                                   int sleepDuration) {
+    final double percentageConversionFactor = 100;
+    double bytesFailed = metrics.bytesFailed.get();
+    double bytesSuccessful = metrics.bytesSuccessful.get();
+    double operationsFailed = metrics.operationsFailed.get();
+    double operationsSuccessful = metrics.operationsSuccessful.get();
+    double errorPercentage = (bytesFailed <= 0)
+        ? 0
+        : (percentageConversionFactor
+        * bytesFailed
+        / (bytesFailed + bytesSuccessful));
+    long periodMs = metrics.endTime - metrics.startTime;
+
+    double newSleepDuration;
+
+    if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
+      ++consecutiveNoErrorCount;
+      // Decrease sleepDuration in order to increase throughput.
+      double reductionFactor =
+          (consecutiveNoErrorCount * analysisPeriodMs
+              >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
+              ? RAPID_SLEEP_DECREASE_FACTOR
+              : SLEEP_DECREASE_FACTOR;
+
+      newSleepDuration = sleepDuration * reductionFactor;
+    } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
+      // Do not modify sleepDuration in order to stabilize throughput.
+      newSleepDuration = sleepDuration;
+    } else {
+      // Increase sleepDuration in order to minimize error rate.
+      consecutiveNoErrorCount = 0;
+
+      // Increase sleep duration in order to reduce throughput and error rate.
+      // First, calculate target throughput: bytesSuccessful / periodMs.
+      // Next, calculate time required to send *all* data (assuming next period
+      // is similar to previous) at the target throughput: (bytesSuccessful
+      // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
+      // get the total additional delay needed.
+      double additionalDelayNeeded = 5 * analysisPeriodMs;
+      if (bytesSuccessful > 0) {
+        additionalDelayNeeded = (bytesSuccessful + bytesFailed)
+            * periodMs
+            / bytesSuccessful
+            - periodMs;
+      }
+
+      // amortize the additional delay needed across the estimated number of
+      // requests during the next period
+      newSleepDuration = additionalDelayNeeded
+          / (operationsFailed + operationsSuccessful);
+
+      final double maxSleepDuration = analysisPeriodMs;
+      final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
+
+      // Add 1 ms to avoid rounding down and to decrease proximity to the server
+      // side ingress/egress limit.  Ensure that the new sleep duration is
+      // larger than the current one to more quickly reduce the number of
+      // errors.  Don't allow the sleep duration to grow unbounded, after a
+      // certain point throttling won't help, for example, if there are far too
+      // many tasks/containers/nodes no amount of throttling will help.
+      newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
+      newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format(
+          "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
+          name,
+          (int) bytesFailed,
+          (int) bytesSuccessful,
+          (int) operationsFailed,
+          (int) operationsSuccessful,
+          errorPercentage,
+          periodMs,
+          (int) sleepDuration,
+          (int) newSleepDuration));
+    }
+
+    return (int) newSleepDuration;
+  }
+
+  /**
+   * Timer callback implementation for periodically analyzing metrics.
+   */
+  class TimerTaskImpl extends TimerTask {
+    private AtomicInteger doingWork = new AtomicInteger(0);
+
+    /**
+     * Periodically analyzes a snapshot of the blob storage metrics and updates
+     * the sleepDuration in order to appropriately throttle storage operations.
+     */
+    @Override
+    public void run() {
+      boolean doWork = false;
+      try {
+        doWork = doingWork.compareAndSet(0, 1);
+
+        // prevent concurrent execution of this task
+        if (!doWork) {
+          return;
+        }
+
+        long now = System.currentTimeMillis();
+        if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+          AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
+              new AbfsOperationMetrics(now));
+          oldMetrics.endTime = now;
+          sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
+              sleepDuration);
+        }
+      } finally {
+        if (doWork) {
+          doingWork.set(0);
+        }
+      }
+    }
+  }
+
+  /**
+   * Stores Abfs operation metrics during each analysis period.
+   */
+  static class AbfsOperationMetrics {
+    private AtomicLong bytesFailed;
+    private AtomicLong bytesSuccessful;
+    private AtomicLong operationsFailed;
+    private AtomicLong operationsSuccessful;
+    private long endTime;
+    private long startTime;
+
+    AbfsOperationMetrics(long startTime) {
+      this.startTime = startTime;
+      this.bytesFailed = new AtomicLong();
+      this.bytesSuccessful = new AtomicLong();
+      this.operationsFailed = new AtomicLong();
+      this.operationsSuccessful = new AtomicLong();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
new file mode 100644
index 0000000..e981d76
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.HttpURLConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Throttles Azure Blob File System read and write operations to achieve maximum
+ * throughput by minimizing errors.  The errors occur when the account ingress
+ * or egress limits are exceeded and the server-side throttles requests.
+ * Server-side throttling causes the retry policy to be used, but the retry
+ * policy sleeps for long periods of time causing the total ingress or egress
+ * throughput to be as much as 35% lower than optimal.  The retry policy is also
+ * after the fact, in that it applies after a request fails.  On the other hand,
+ * the client-side throttling implemented here happens before requests are made
+ * and sleeps just enough to minimize errors, allowing optimal ingress and/or
+ * egress throughput.
+ */
+public final class AbfsClientThrottlingIntercept {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsClientThrottlingIntercept.class);
+  private static AbfsClientThrottlingIntercept singleton = null;
+  private AbfsClientThrottlingAnalyzer readThrottler = null;
+  private AbfsClientThrottlingAnalyzer writeThrottler = null;
+  private static boolean isAutoThrottlingEnabled = false;
+
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept() {
+    readThrottler = new AbfsClientThrottlingAnalyzer("read");
+    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+    isAutoThrottlingEnabled = true;
+    LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+  }
+
+  public static synchronized void initializeSingleton(boolean isAutoThrottlingEnabled) {
+    if (!isAutoThrottlingEnabled) {
+      return;
+    }
+    if (singleton == null) {
+      singleton = new AbfsClientThrottlingIntercept();
+    }
+  }
+
+  static void updateMetrics(AbfsRestOperationType operationType,
+                            AbfsHttpOperation abfsHttpOperation) {
+    if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+      return;
+    }
+
+    int status = abfsHttpOperation.getStatusCode();
+    long contentLength = 0;
+    // If the socket is terminated prior to receiving a response, the HTTP
+    // status may be 0 or -1.  A status less than 200 or greater than or equal
+    // to 500 is considered an error.
+    boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
+        || status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
+
+    switch (operationType) {
+      case Append:
+        contentLength = abfsHttpOperation.getBytesSent();
+        if (contentLength > 0) {
+          singleton.writeThrottler.addBytesTransferred(contentLength,
+              isFailedOperation);
+        }
+        break;
+      case ReadFile:
+        contentLength = abfsHttpOperation.getBytesReceived();
+        if (contentLength > 0) {
+          singleton.readThrottler.addBytesTransferred(contentLength,
+              isFailedOperation);
+        }
+        break;
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Called before the request is sent.  Client-side throttling
+   * uses this to suspend the request, if necessary, to minimize errors and
+   * maximize throughput.
+   */
+  static void sendingRequest(AbfsRestOperationType operationType) {
+    if (!isAutoThrottlingEnabled) {
+      return;
+    }
+
+    switch (operationType) {
+      case ReadFile:
+        singleton.readThrottler.suspendIfNecessary();
+        break;
+      case Append:
+        singleton.writeThrottler.suspendIfNecessary();
+        break;
+      default:
+        break;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index c0407f5..9a71879 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
  * The AbfsRestOperation for Rest AbfsClient.
  */
 public class AbfsRestOperation {
+  // The type of the REST operation (Append, ReadFile, etc)
+  private final AbfsRestOperationType operationType;
   // Blob FS client, which has the credentials, retry policy, and logs.
   private final AbfsClient client;
   // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
@@ -71,10 +73,12 @@ public class AbfsRestOperation {
    * @param url The full URL including query string parameters.
    * @param requestHeaders The HTTP request headers.
    */
-  AbfsRestOperation(final AbfsClient client,
+  AbfsRestOperation(final AbfsRestOperationType operationType,
+                    final AbfsClient client,
                     final String method,
                     final URL url,
                     final List<AbfsHttpHeader> requestHeaders) {
+    this.operationType = operationType;
     this.client = client;
     this.method = method;
     this.url = url;
@@ -86,6 +90,7 @@ public class AbfsRestOperation {
   /**
    * Initializes a new REST operation.
    *
+   * @param operationType The type of the REST operation (Append, ReadFile, etc).
    * @param client The Blob FS client.
    * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
    * @param url The full URL including query string parameters.
@@ -95,14 +100,15 @@ public class AbfsRestOperation {
    * @param bufferOffset An offset into the buffer where the data beings.
    * @param bufferLength The length of the data in the buffer.
    */
-  AbfsRestOperation(AbfsClient client,
+  AbfsRestOperation(AbfsRestOperationType operationType,
+                    AbfsClient client,
                     String method,
                     URL url,
                     List<AbfsHttpHeader> requestHeaders,
                     byte[] buffer,
                     int bufferOffset,
                     int bufferLength) {
-    this(client, method, url, requestHeaders);
+    this(operationType, client, method, url, requestHeaders);
     this.buffer = buffer;
     this.bufferOffset = bufferOffset;
     this.bufferLength = bufferLength;
@@ -152,6 +158,7 @@ public class AbfsRestOperation {
 
       if (hasRequestBody) {
         // HttpUrlConnection requires
+        AbfsClientThrottlingIntercept.sendingRequest(operationType);
         httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
       }
 
@@ -168,6 +175,8 @@ public class AbfsRestOperation {
         throw new InvalidAbfsRestOperationException(ex);
       }
       return false;
+    } finally {
+      AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
     }
 
     LOG.debug("HttpRequest: " + httpOperation.toString());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
new file mode 100644
index 0000000..eeea817
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * The REST operation type (Read, Append, Other ).
+ */
+public enum AbfsRestOperationType {
+    CreateFileSystem,
+    GetFileSystemProperties,
+    SetFileSystemProperties,
+    ListPaths,
+    DeleteFileSystem,
+    CreatePath,
+    RenamePath,
+    GetAcl,
+    GetPathProperties,
+    SetAcl,
+    SetOwner,
+    SetPathProperties,
+    SetPermissions,
+    Append,
+    Flush,
+    ReadFile,
+    DeletePath
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..5105b85
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
@@ -0,0 +1,159 @@
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for <code>AbfsClientThrottlingAnalyzer</code>.
+ */
+public class TestAbfsClientThrottlingAnalyzer {
+  private static final int ANALYSIS_PERIOD = 1000;
+  private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+      + ANALYSIS_PERIOD / 10;
+  private static final long MEGABYTE = 1024 * 1024;
+  private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
+
+  private void sleep(long milliseconds) {
+    try {
+      Thread.sleep(milliseconds);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void fuzzyValidate(long expected, long actual, double percentage) {
+    final double lowerBound = Math.max(expected - percentage / 100 * expected, 0);
+    final double upperBound = expected + percentage / 100 * expected;
+
+    assertTrue(
+        String.format(
+            "The actual value %1$d is not within the expected range: "
+                + "[%2$.2f, %3$.2f].",
+            actual,
+            lowerBound,
+            upperBound),
+        actual >= lowerBound && actual <= upperBound);
+  }
+
+  private void validate(long expected, long actual) {
+    assertEquals(
+        String.format("The actual value %1$d is not the expected value %2$d.",
+            actual,
+            expected),
+        expected, actual);
+  }
+
+  private void validateLessThanOrEqual(long maxExpected, long actual) {
+    assertTrue(
+        String.format(
+            "The actual value %1$d is not less than or equal to the maximum"
+                + " expected value %2$d.",
+            actual,
+            maxExpected),
+        actual < maxExpected);
+  }
+
+  /**
+   * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+   * never been updated.  This validates proper initialization of
+   * ClientThrottlingAnalyzer.
+   */
+  @Test
+  public void testNoMetricUpdatesThenNoWaiting() {
+    AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    validate(0, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+   * only been updated with successful requests.
+   */
+  @Test
+  public void testOnlySuccessThenNoWaiting() {
+    AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    analyzer.addBytesTransferred(8 * MEGABYTE, false);
+    validate(0, analyzer.getSleepDuration());
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    validate(0, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+   * only been updated with failed requests.  Also ensure that the
+   * sleepDuration decreases over time.
+   */
+  @Test
+  public void testOnlyErrorsAndWaiting() {
+    AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    analyzer.addBytesTransferred(4 * MEGABYTE, true);
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    final int expectedSleepDuration1 = 1100;
+    validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration());
+    sleep(10 * ANALYSIS_PERIOD);
+    final int expectedSleepDuration2 = 900;
+    validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+   * only been updated with both successful and failed requests.  Also ensure
+   * that the sleepDuration decreases over time.
+   */
+  @Test
+  public void testSuccessAndErrorsAndWaiting() {
+    AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    analyzer.addBytesTransferred(8 * MEGABYTE, false);
+    analyzer.addBytesTransferred(2 * MEGABYTE, true);
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    analyzer.suspendIfNecessary();
+    final int expectedElapsedTime = 126;
+    fuzzyValidate(expectedElapsedTime,
+        timer.elapsedTimeMs(),
+        MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+    sleep(10 * ANALYSIS_PERIOD);
+    final int expectedSleepDuration = 110;
+    validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+   * only been updated with many successful and failed requests.  Also ensure
+   * that the sleepDuration decreases to zero over time.
+   */
+  @Test
+  public void testManySuccessAndErrorsAndWaiting() {
+    AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    final int numberOfRequests = 20;
+    for (int i = 0; i < numberOfRequests; i++) {
+      analyzer.addBytesTransferred(8 * MEGABYTE, false);
+      analyzer.addBytesTransferred(2 * MEGABYTE, true);
+    }
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    analyzer.suspendIfNecessary();
+    fuzzyValidate(7,
+        timer.elapsedTimeMs(),
+        MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+    sleep(10 * ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org