You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/08/01 20:37:18 UTC

hadoop git commit: HADOOP-14660 wasb: improve throughput by 34% when account limit exceeded. Contributed by Thomas Marquardt.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 777475983 -> 778d4edd9


HADOOP-14660 wasb: improve throughput by 34% when account limit exceeded.
Contributed by Thomas Marquardt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/778d4edd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/778d4edd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/778d4edd

Branch: refs/heads/trunk
Commit: 778d4edd9adbe9519c3d6df65e45ddc8bb0ab2da
Parents: 7774759
Author: Steve Loughran <st...@apache.org>
Authored: Tue Aug 1 21:33:52 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Aug 1 21:36:28 2017 +0100

----------------------------------------------------------------------
 .../hadoop/fs/contract/ContractTestUtils.java   |  10 +-
 .../fs/azure/AzureNativeFileSystemStore.java    | 102 ++++---
 .../fs/azure/BlobOperationDescriptor.java       | 222 ++++++++++++++
 .../fs/azure/ClientThrottlingAnalyzer.java      | 284 +++++++++++++++++
 .../fs/azure/ClientThrottlingIntercept.java     | 221 ++++++++++++++
 .../hadoop/fs/azure/AbstractWasbTestBase.java   |   4 +
 .../fs/azure/TestBlobOperationDescriptor.java   | 305 +++++++++++++++++++
 .../fs/azure/TestClientThrottlingAnalyzer.java  | 177 +++++++++++
 8 files changed, 1282 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index c66dabf..4d9495e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1524,7 +1524,7 @@ public class ContractTestUtils extends Assert {
    * printing some useful results in the process.
    */
   public static final class NanoTimer {
-    private final long startTime;
+    private long startTime;
     private long endTime;
 
     public NanoTimer() {
@@ -1532,6 +1532,14 @@ public class ContractTestUtils extends Assert {
     }
 
     /**
+     * Reset the timer.  Equivalent to the reset button of a stopwatch.
+     */
+    public void reset() {
+      endTime = 0;
+      startTime = now();
+    }
+
+    /**
      * End the operation.
      * @return the duration of the operation
      */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 7c198af..554027b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -158,6 +158,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
   private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor";
 
+  private static final String KEY_AUTO_THROTTLE_ENABLE = "fs.azure.autothrottling.enable";
+
   private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging";
 
   /**
@@ -239,10 +241,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   // Retry parameter defaults.
   //
 
-  private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
+  private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 1s
   private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
-  private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
-  private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
+  private static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 1s
+  private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
 
   private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3  * 1000;
   private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000;
@@ -256,6 +258,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private static final float DEFAULT_SELF_THROTTLE_READ_FACTOR = 1.0f;
   private static final float DEFAULT_SELF_THROTTLE_WRITE_FACTOR = 1.0f;
 
+  private static final boolean DEFAULT_AUTO_THROTTLE_ENABLE = false;
+
   private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
 
   /**
@@ -283,7 +287,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private boolean connectingUsingSAS = false;
   private AzureFileSystemInstrumentation instrumentation;
   private BandwidthGaugeUpdater bandwidthGaugeUpdater;
-  private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
+  private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
 
   private boolean suppressRetryPolicy = false;
   private boolean canCreateOrModifyContainer = false;
@@ -308,6 +312,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private float selfThrottlingReadFactor;
   private float selfThrottlingWriteFactor;
 
+  private boolean autoThrottlingEnabled;
+
   private TestHookOperationContext testHookOperationContext = null;
 
   // Set if we're running against a storage emulator..
@@ -481,7 +487,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           "Cannot initialize WASB file system, conf is null");
     }
 
-    if(!conf.getBoolean(
+    if (!conf.getBoolean(
         NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
       //If not skip azure metrics, create bandwidthGaugeUpdater
       this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation);
@@ -664,9 +670,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private String getHTTPScheme() {
     String sessionScheme = sessionUri.getScheme();
     // Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
-    if (sessionScheme != null &&
-        (sessionScheme.equalsIgnoreCase("asvs") ||
-         sessionScheme.equalsIgnoreCase("wasbs"))) {
+    if (sessionScheme != null
+        && (sessionScheme.equalsIgnoreCase("asvs")
+         || sessionScheme.equalsIgnoreCase("wasbs"))) {
       return HTTPS_SCHEME;
     } else {
       // At this point the scheme should be either null or asv or wasb.
@@ -766,6 +772,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     selfThrottlingWriteFactor = sessionConfiguration.getFloat(
         KEY_SELF_THROTTLE_WRITE_FACTOR, DEFAULT_SELF_THROTTLE_WRITE_FACTOR);
 
+    if (!selfThrottlingEnabled) {
+      autoThrottlingEnabled = sessionConfiguration.getBoolean(
+          KEY_AUTO_THROTTLE_ENABLE,
+          DEFAULT_AUTO_THROTTLE_ENABLE);
+      if (autoThrottlingEnabled) {
+        ClientThrottlingIntercept.initializeSingleton();
+      }
+    } else {
+      // cannot enable both self-throttling and client-throttling
+      autoThrottlingEnabled = false;
+    }
+
     OperationContext.setLoggingEnabledByDefault(sessionConfiguration.
         getBoolean(KEY_ENABLE_STORAGE_CLIENT_LOGGING, false));
 
@@ -839,8 +857,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           CloudStorageAccount.getDevelopmentStorageAccount();
       storageInteractionLayer.createBlobClient(account);
     } else {
-      blobEndPoint = new URI(getHTTPScheme() + "://" +
-          accountName);
+      blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
       storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
     }
     suppressRetryPolicyInClientIfNeeded();
@@ -951,7 +968,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * @throws AzureException
    * @throws IOException
    */
-  private void createAzureStorageSession ()
+  private void createAzureStorageSession()
       throws AzureException, IOException {
 
     // Make sure this object was properly initialized with references to
@@ -1128,8 +1145,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         myDir = verifyAndConvertToStandardFormat(currentDir);
       } catch (URISyntaxException ex) {
         throw new AzureException(String.format(
-            "The directory %s specified in the configuration entry %s is not" +
-            " a valid URI.",
+            "The directory %s specified in the configuration entry %s is not"
+            + " a valid URI.",
             currentDir, configVar));
       }
       if (myDir != null) {
@@ -1159,8 +1176,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
     String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
     for (String dir : dirSet) {
-      if (dir.isEmpty() ||
-          key.startsWith(dir + "/")) {
+      if (dir.isEmpty() || key.startsWith(dir + "/")) {
         return true;
       }
 
@@ -1168,7 +1184,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       // system.
       //
       try {
-        URI uriPageBlobDir = new URI (dir);
+        URI uriPageBlobDir = new URI(dir);
         if (null == uriPageBlobDir.getAuthority()) {
           // Concatenate the default file system prefix with the relative
           // page blob directory path.
@@ -1424,7 +1440,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       throws StorageException {
     if (blob instanceof CloudPageBlobWrapper){
       return new PageBlobOutputStream(
-          (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
+          (CloudPageBlobWrapper) blob, getInstrumentedContext(), sessionConfiguration);
     } else {
 
       // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
@@ -1739,12 +1755,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
       boolean useFlatBlobListing) throws StorageException, URISyntaxException {
     return rootDirectory.listBlobs(
-        null, useFlatBlobListing,
-        includeMetadata ?
-            EnumSet.of(BlobListingDetails.METADATA) :
-              EnumSet.noneOf(BlobListingDetails.class),
         null,
-              getInstrumentedContext());
+        useFlatBlobListing,
+        includeMetadata
+            ? EnumSet.of(BlobListingDetails.METADATA)
+            : EnumSet.noneOf(BlobListingDetails.class),
+        null,
+        getInstrumentedContext());
   }
 
   /**
@@ -1771,11 +1788,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
     Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix,
         useFlatBlobListing,
-        includeMetadata ?
-            EnumSet.of(BlobListingDetails.METADATA) :
-              EnumSet.noneOf(BlobListingDetails.class),
-              null,
-              getInstrumentedContext());
+        includeMetadata
+            ? EnumSet.of(BlobListingDetails.METADATA)
+            : EnumSet.noneOf(BlobListingDetails.class),
+        null,
+        getInstrumentedContext());
     return list;
   }
 
@@ -1941,9 +1958,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     if (selfThrottlingEnabled) {
       SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor,
           selfThrottlingWriteFactor);
+    } else if (autoThrottlingEnabled) {
+      ClientThrottlingIntercept.hook(operationContext);
     }
 
-    if(bandwidthGaugeUpdater != null) {
+    if (bandwidthGaugeUpdater != null) {
       //bandwidthGaugeUpdater is null when we config to skip azure metrics
       ResponseReceivedMetricUpdater.hook(
          operationContext,
@@ -2446,10 +2465,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       // 1. It's a BlobNotFound exception AND
       // 2. It got there after one-or-more retries THEN
       // we swallow the exception.
-      if (e.getErrorCode() != null &&
-          "BlobNotFound".equals(e.getErrorCode()) &&
-          operationContext.getRequestResults().size() > 1 &&
-          operationContext.getRequestResults().get(0).getException() != null) {
+      if (e.getErrorCode() != null
+          && "BlobNotFound".equals(e.getErrorCode())
+          && operationContext.getRequestResults().size() > 1
+          && operationContext.getRequestResults().get(0).getException() != null) {
         LOG.debug("Swallowing delete exception on retry: {}", e.getMessage());
         return;
       } else {
@@ -2496,7 +2515,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       return delete(key, null);
     } catch (IOException e) {
       Throwable t = e.getCause();
-      if(t != null && t instanceof StorageException) {
+      if (t != null && t instanceof StorageException) {
         StorageException se = (StorageException) t;
         if ("LeaseIdMissing".equals(se.getErrorCode())){
           SelfRenewingLease lease = null;
@@ -2509,7 +2528,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
             throw e3;
           } finally {
             try {
-              if(lease != null){
+              if (lease != null){
                 lease.free();
               }
             } catch (Exception e4){
@@ -2561,8 +2580,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
       srcBlob = getBlobReference(srcKey);
       if (!srcBlob.exists(getInstrumentedContext())) {
-        throw new AzureException ("Source blob " + srcKey +
-            " does not exist.");
+        throw new AzureException("Source blob " + srcKey + " does not exist.");
       }
 
       /**
@@ -2600,19 +2618,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
           int copyBlobMinBackoff = sessionConfiguration.getInt(
             KEY_COPYBLOB_MIN_BACKOFF_INTERVAL,
-			DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
+            DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
 
           int copyBlobMaxBackoff = sessionConfiguration.getInt(
             KEY_COPYBLOB_MAX_BACKOFF_INTERVAL,
-			DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL);
+            DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL);
 
           int copyBlobDeltaBackoff = sessionConfiguration.getInt(
             KEY_COPYBLOB_BACKOFF_INTERVAL,
-			DEFAULT_COPYBLOB_BACKOFF_INTERVAL);
+            DEFAULT_COPYBLOB_BACKOFF_INTERVAL);
 
           int copyBlobMaxRetries = sessionConfiguration.getInt(
             KEY_COPYBLOB_MAX_IO_RETRIES,
-			DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS);
+            DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS);
 
           BlobRequestOptions options = new BlobRequestOptions();
           options.setRetryPolicyFactory(new RetryExponentialRetry(
@@ -2631,7 +2649,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         InputStream ipStream = null;
         OutputStream opStream = null;
         try {
-          if(srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
+          if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
             ipStream = openInputStream(srcBlob);
             opStream = openOutputStream(dstBlob);
             byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
@@ -2817,7 +2835,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   @Override
   public void close() {
-    if(bandwidthGaugeUpdater != null) {
+    if (bandwidthGaugeUpdater != null) {
       bandwidthGaugeUpdater.close();
       bandwidthGaugeUpdater = null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java
new file mode 100644
index 0000000..6da64e1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.Constants.HeaderConstants;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Determines the operation type (PutBlock, PutPage, GetBlob, etc) of Azure
+ * Storage operations.  This is used by the handlers of the SendingRequestEvent
+ * and ResponseReceivedEvent exposed by the Azure Storage SDK to identify
+ * operation types (since the type of operation is not exposed by the SDK).
+ */
+@InterfaceAudience.Private
+final class BlobOperationDescriptor {
+
+  private BlobOperationDescriptor() {
+    // hide default constructor
+  }
+
+  /**
+   * Gets the content length for the Azure Storage operation from the
+   * 'x-ms-range' header, if set.
+   * @param range the value of the 'x-ms-range' header.
+   * @return the content length, or zero if not set.
+   */
+  private static long getContentLengthIfKnown(String range) {
+    long contentLength = 0;
+    // Format is "bytes=%d-%d"
+    if (range != null && range.startsWith("bytes=")) {
+      String[] offsets = range.substring("bytes=".length()).split("-");
+      if (offsets.length == 2) {
+        contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0])
+            + 1;
+      }
+    }
+    return contentLength;
+  }
+
+  /**
+   * Gets the content length for the Azure Storage operation, or returns zero if
+   * unknown.
+   * @param conn the connection object for the Azure Storage operation.
+   * @param operationType the Azure Storage operation type.
+   * @return the content length, or zero if unknown.
+   */
+  static long getContentLengthIfKnown(HttpURLConnection conn,
+                                      OperationType operationType) {
+    long contentLength = 0;
+    switch (operationType) {
+      case AppendBlock:
+      case PutBlock:
+        String lengthString = conn.getRequestProperty(
+            HeaderConstants.CONTENT_LENGTH);
+        contentLength = (lengthString != null)
+            ? Long.parseLong(lengthString)
+            : 0;
+        break;
+      case PutPage:
+      case GetBlob:
+        contentLength = BlobOperationDescriptor.getContentLengthIfKnown(
+            conn.getRequestProperty("x-ms-range"));
+        break;
+      default:
+        break;
+    }
+    return contentLength;
+  }
+
+  /**
+   * Gets the operation type of an Azure Storage operation.
+   *
+   * @param conn the connection object for the Azure Storage operation.
+   * @return the operation type.
+   */
+  static OperationType getOperationType(HttpURLConnection conn) {
+    OperationType operationType = OperationType.Unknown;
+    String method = conn.getRequestMethod();
+    String compValue = getQueryParameter(conn.getURL(),
+        "comp");
+
+    if (method.equalsIgnoreCase("PUT")) {
+      if (compValue != null) {
+        switch (compValue) {
+          case "metadata":
+            operationType = OperationType.SetMetadata;
+            break;
+          case "properties":
+            operationType = OperationType.SetProperties;
+            break;
+          case "block":
+            operationType = OperationType.PutBlock;
+            break;
+          case "page":
+            String pageWrite = conn.getRequestProperty("x-ms-page-write");
+            if (pageWrite != null && pageWrite.equalsIgnoreCase(
+                "UPDATE")) {
+              operationType = OperationType.PutPage;
+            }
+            break;
+          case "appendblock":
+            operationType = OperationType.AppendBlock;
+            break;
+          case "blocklist":
+            operationType = OperationType.PutBlockList;
+            break;
+          default:
+            break;
+        }
+      } else {
+        String blobType = conn.getRequestProperty("x-ms-blob-type");
+        if (blobType != null
+            && (blobType.equalsIgnoreCase("PageBlob")
+            || blobType.equalsIgnoreCase("BlockBlob")
+            || blobType.equalsIgnoreCase("AppendBlob"))) {
+          operationType = OperationType.CreateBlob;
+        } else if (blobType == null) {
+          String resType = getQueryParameter(conn.getURL(),
+              "restype");
+          if (resType != null
+              && resType.equalsIgnoreCase("container")) {
+            operationType = operationType.CreateContainer;
+          }
+        }
+      }
+    } else if (method.equalsIgnoreCase("GET")) {
+      if (compValue != null) {
+        switch (compValue) {
+          case "list":
+            operationType = OperationType.ListBlobs;
+            break;
+
+          case "metadata":
+            operationType = OperationType.GetMetadata;
+            break;
+          case "blocklist":
+            operationType = OperationType.GetBlockList;
+            break;
+          case "pagelist":
+            operationType = OperationType.GetPageList;
+            break;
+          default:
+            break;
+        }
+      } else if (conn.getRequestProperty("x-ms-range") != null) {
+        operationType = OperationType.GetBlob;
+      }
+    } else if (method.equalsIgnoreCase("HEAD")) {
+      operationType = OperationType.GetProperties;
+    } else if (method.equalsIgnoreCase("DELETE")) {
+      String resType = getQueryParameter(conn.getURL(),
+          "restype");
+      if (resType != null
+          && resType.equalsIgnoreCase("container")) {
+        operationType = operationType.DeleteContainer;
+      } else {
+        operationType = OperationType.DeleteBlob;
+      }
+    }
+    return operationType;
+  }
+
+  private static String getQueryParameter(URL url, String queryParameterName) {
+    String query = (url != null) ? url.getQuery(): null;
+
+    if (query == null) {
+      return null;
+    }
+
+    String searchValue = queryParameterName + "=";
+
+    int offset = query.indexOf(searchValue);
+    String value = null;
+    if (offset != -1) {
+      int beginIndex = offset + searchValue.length();
+      int endIndex = query.indexOf('&', beginIndex);
+      value = (endIndex == -1)
+          ? query.substring(beginIndex)
+          : query.substring(beginIndex, endIndex);
+    }
+    return value;
+  }
+
+  @InterfaceAudience.Private
+  enum OperationType {
+    AppendBlock,
+    CreateBlob,
+    CreateContainer,
+    DeleteBlob,
+    DeleteContainer,
+    GetBlob,
+    GetBlockList,
+    GetMetadata,
+    GetPageList,
+    GetProperties,
+    ListBlobs,
+    PutBlock,
+    PutBlockList,
+    PutPage,
+    SetMetadata,
+    SetProperties,
+    Unknown
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..aa7ac2e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Throttles storage operations to minimize errors and maximum throughput. This
+ * improves throughput by as much as 35% when the service throttles requests due
+ * to exceeding account level ingress or egress limits.
+ */
+@InterfaceAudience.Private
+class ClientThrottlingAnalyzer {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ClientThrottlingAnalyzer.class);
+  private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
+  private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
+  private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
+  private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
+  private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
+  private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
+  private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
+      * 1000;
+  private static final double SLEEP_DECREASE_FACTOR = .975;
+  private static final double SLEEP_INCREASE_FACTOR = 1.05;
+  private int analysisPeriodMs;
+
+  private volatile int sleepDuration = 0;
+  private long consecutiveNoErrorCount = 0;
+  private String name = null;
+  private Timer timer = null;
+  private AtomicReference<BlobOperationMetrics> blobMetrics = null;
+
+  private ClientThrottlingAnalyzer() {
+    // hide default constructor
+  }
+
+  /**
+   * Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
+   * the specified name.
+   *
+   * @param name a name used to identify this instance.
+   *
+   * @throws IllegalArgumentException if name is null or empty.
+   */
+  ClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
+    this(name, DEFAULT_ANALYSIS_PERIOD_MS);
+  }
+
+  /**
+   * Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
+   * the specified name and period.
+   *
+   * @param name A name used to identify this instance.
+   *
+   * @param period The frequency, in milliseconds, at which metrics are
+   *              analyzed.
+   *
+   * @throws IllegalArgumentException
+   *           If name is null or empty.
+   *           If period is less than 1000 or greater than 30000 milliseconds.
+   */
+  ClientThrottlingAnalyzer(String name, int period)
+      throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        StringUtils.isNotEmpty(name),
+        "The argument 'name' cannot be null or empty.");
+    Preconditions.checkArgument(
+        period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
+      "The argument 'period' must be between 1000 and 30000.");
+    this.name = name;
+    this.analysisPeriodMs = period;
+    this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
+        new BlobOperationMetrics(System.currentTimeMillis()));
+    this.timer = new Timer(
+        String.format("wasb-timer-client-throttling-analyzer-%s", name));
+    this.timer.schedule(new TimerTaskImpl(),
+        analysisPeriodMs,
+        analysisPeriodMs);
+  }
+
+  /**
+   * Updates metrics with results from the current storage operation.
+   *
+   * @param count The count of bytes transferred.
+   *
+   * @param isFailedOperation True if the operation failed; otherwise false.
+   */
+  public void addBytesTransferred(long count, boolean isFailedOperation) {
+    BlobOperationMetrics metrics = blobMetrics.get();
+    if (isFailedOperation) {
+      metrics.bytesFailed.addAndGet(count);
+      metrics.operationsFailed.incrementAndGet();
+    } else {
+      metrics.bytesSuccessful.addAndGet(count);
+      metrics.operationsSuccessful.incrementAndGet();
+    }
+  }
+
+  /**
+   * Suspends the current storage operation, as necessary, to reduce throughput.
+   */
+  public void suspendIfNecessary() {
+    int duration = sleepDuration;
+    if (duration > 0) {
+      try {
+        Thread.sleep(duration);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  int getSleepDuration() {
+    return sleepDuration;
+  }
+
+  private int analyzeMetricsAndUpdateSleepDuration(BlobOperationMetrics metrics,
+                                                   int sleepDuration) {
+    final double percentageConversionFactor = 100;
+    double bytesFailed = metrics.bytesFailed.get();
+    double bytesSuccessful = metrics.bytesSuccessful.get();
+    double operationsFailed = metrics.operationsFailed.get();
+    double operationsSuccessful = metrics.operationsSuccessful.get();
+    double errorPercentage = (bytesFailed <= 0)
+        ? 0
+        : percentageConversionFactor
+        * bytesFailed
+        / (bytesFailed + bytesSuccessful);
+    long periodMs = metrics.endTime - metrics.startTime;
+
+    double newSleepDuration;
+
+    if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
+      ++consecutiveNoErrorCount;
+      // Decrease sleepDuration in order to increase throughput.
+      double reductionFactor =
+          (consecutiveNoErrorCount * analysisPeriodMs
+              >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
+              ? RAPID_SLEEP_DECREASE_FACTOR
+              : SLEEP_DECREASE_FACTOR;
+
+      newSleepDuration = sleepDuration * reductionFactor;
+    } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
+      // Do not modify sleepDuration in order to stabilize throughput.
+      newSleepDuration = sleepDuration;
+    } else {
+      // Increase sleepDuration in order to minimize error rate.
+      consecutiveNoErrorCount = 0;
+
+      // Increase sleep duration in order to reduce throughput and error rate.
+      // First, calculate target throughput: bytesSuccessful / periodMs.
+      // Next, calculate time required to send *all* data (assuming next period
+      // is similar to previous) at the target throughput: (bytesSuccessful
+      // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
+      // get the total additional delay needed.
+      double additionalDelayNeeded = 5 * analysisPeriodMs;
+      if (bytesSuccessful > 0) {
+        additionalDelayNeeded = (bytesSuccessful + bytesFailed)
+            * periodMs
+            / bytesSuccessful
+            - periodMs;
+      }
+
+      // amortize the additional delay needed across the estimated number of
+      // requests during the next period
+      newSleepDuration = additionalDelayNeeded
+          / (operationsFailed + operationsSuccessful);
+
+      final double maxSleepDuration = analysisPeriodMs;
+      final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
+
+      // Add 1 ms to avoid rounding down and to decrease proximity to the server
+      // side ingress/egress limit.  Ensure that the new sleep duration is
+      // larger than the current one to more quickly reduce the number of
+      // errors.  Don't allow the sleep duration to grow unbounded, after a
+      // certain point throttling won't help, for example, if there are far too
+      // many tasks/containers/nodes no amount of throttling will help.
+      newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
+      newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format(
+          "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
+          name,
+          (int) bytesFailed,
+          (int) bytesSuccessful,
+          (int) operationsFailed,
+          (int) operationsSuccessful,
+          errorPercentage,
+          periodMs,
+          (int) sleepDuration,
+          (int) newSleepDuration));
+    }
+
+    return (int) newSleepDuration;
+  }
+
+  /**
+   * Timer callback implementation for periodically analyzing metrics.
+   */
+  class TimerTaskImpl extends TimerTask {
+    private AtomicInteger doingWork = new AtomicInteger(0);
+
+    /**
+     * Periodically analyzes a snapshot of the blob storage metrics and updates
+     * the sleepDuration in order to appropriately throttle storage operations.
+     */
+    @Override
+    public void run() {
+      boolean doWork = false;
+      try {
+        doWork = doingWork.compareAndSet(0, 1);
+
+        // prevent concurrent execution of this task
+        if (!doWork) {
+          return;
+        }
+
+        long now = System.currentTimeMillis();
+        if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+          BlobOperationMetrics oldMetrics = blobMetrics.getAndSet(
+              new BlobOperationMetrics(now));
+          oldMetrics.endTime = now;
+          sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
+              sleepDuration);
+        }
+      }
+      finally {
+        if (doWork) {
+          doingWork.set(0);
+        }
+      }
+    }
+  }
+
+  /**
+   * Stores blob operation metrics during each analysis period.
+   */
+  static class BlobOperationMetrics {
+    private AtomicLong bytesFailed;
+    private AtomicLong bytesSuccessful;
+    private AtomicLong operationsFailed;
+    private AtomicLong operationsSuccessful;
+    private long endTime;
+    private long startTime;
+
+    BlobOperationMetrics(long startTime) {
+      this.startTime = startTime;
+      this.bytesFailed = new AtomicLong();
+      this.bytesSuccessful = new AtomicLong();
+      this.operationsFailed = new AtomicLong();
+      this.operationsSuccessful = new AtomicLong();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java
new file mode 100644
index 0000000..9da993b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.ErrorReceivingResponseEvent;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.RequestResult;
+import com.microsoft.azure.storage.ResponseReceivedEvent;
+import com.microsoft.azure.storage.SendingRequestEvent;
+import com.microsoft.azure.storage.StorageEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Throttles Azure Storage read and write operations to achieve maximum
+ * throughput by minimizing errors.  The errors occur when the account ingress
+ * or egress limits are exceeded and the server-side throttles requests.
+ * Server-side throttling causes the retry policy to be used, but the retry
+ * policy sleeps for long periods of time causing the total ingress or egress
+ * throughput to be as much as 35% lower than optimal.  The retry policy is also
+ * after the fact, in that it applies after a request fails.  On the other hand,
+ * the client-side throttling implemented here happens before requests are made
+ * and sleeps just enough to minimize errors, allowing optimal ingress and/or
+ * egress throughput.
+ */
+@InterfaceAudience.Private
+final class ClientThrottlingIntercept {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ClientThrottlingIntercept.class);
+  private static ClientThrottlingIntercept singleton = null;
+  private ClientThrottlingAnalyzer readThrottler = null;
+  private ClientThrottlingAnalyzer writeThrottler = null;
+
+  // Hide default constructor
+  private ClientThrottlingIntercept() {
+    readThrottler = new ClientThrottlingAnalyzer("read");
+    writeThrottler = new ClientThrottlingAnalyzer("write");
+    LOG.debug("Client-side throttling is enabled for the WASB file system.");
+  }
+
+  static synchronized void initializeSingleton() {
+    if (singleton == null) {
+      singleton = new ClientThrottlingIntercept();
+    }
+  }
+
+  static void hook(OperationContext context) {
+    context.getErrorReceivingResponseEventHandler().addListener(
+        new ErrorReceivingResponseEventHandler());
+    context.getSendingRequestEventHandler().addListener(
+        new SendingRequestEventHandler());
+    context.getResponseReceivedEventHandler().addListener(
+        new ResponseReceivedEventHandler());
+  }
+
+  private static void updateMetrics(HttpURLConnection conn,
+                                    RequestResult result) {
+    BlobOperationDescriptor.OperationType operationType
+        = BlobOperationDescriptor.getOperationType(conn);
+    int status = result.getStatusCode();
+    long contentLength = 0;
+    // If the socket is terminated prior to receiving a response, the HTTP
+    // status may be 0 or -1.  A status less than 200 or greater than or equal
+    // to 500 is considered an error.
+    boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
+        || status >= java.net.HttpURLConnection.HTTP_INTERNAL_ERROR);
+
+    switch (operationType) {
+      case AppendBlock:
+      case PutBlock:
+      case PutPage:
+        contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+            operationType);
+        if (contentLength > 0) {
+          singleton.writeThrottler.addBytesTransferred(contentLength,
+              isFailedOperation);
+        }
+        break;
+      case GetBlob:
+        contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+            operationType);
+        if (contentLength > 0) {
+          singleton.readThrottler.addBytesTransferred(contentLength,
+              isFailedOperation);
+        }
+        break;
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Called when a network error occurs before the HTTP status and response
+   * headers are received. Client-side throttling uses this to collect metrics.
+   *
+   * @param event The connection, operation, and request state.
+   */
+  public static void errorReceivingResponse(ErrorReceivingResponseEvent event) {
+    updateMetrics((HttpURLConnection) event.getConnectionObject(),
+        event.getRequestResult());
+  }
+
+  /**
+   * Called before the Azure Storage SDK sends a request. Client-side throttling
+   * uses this to suspend the request, if necessary, to minimize errors and
+   * maximize throughput.
+   *
+   * @param event The connection, operation, and request state.
+   */
+  public static void sendingRequest(SendingRequestEvent event) {
+    BlobOperationDescriptor.OperationType operationType
+        = BlobOperationDescriptor.getOperationType(
+            (HttpURLConnection) event.getConnectionObject());
+    switch (operationType) {
+      case GetBlob:
+        singleton.readThrottler.suspendIfNecessary();
+        break;
+      case AppendBlock:
+      case PutBlock:
+      case PutPage:
+        singleton.writeThrottler.suspendIfNecessary();
+        break;
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Called after the Azure Storage SDK receives a response. Client-side
+   * throttling uses this to collect metrics.
+   *
+   * @param event The connection, operation, and request state.
+   */
+  public static void responseReceived(ResponseReceivedEvent event) {
+    updateMetrics((HttpURLConnection) event.getConnectionObject(),
+        event.getRequestResult());
+  }
+
+  /**
+   * The ErrorReceivingResponseEvent is fired when the Azure Storage SDK
+   * encounters a network error before the HTTP status and response headers are
+   * received.
+   */
+  @InterfaceAudience.Private
+  static class ErrorReceivingResponseEventHandler
+      extends StorageEvent<ErrorReceivingResponseEvent> {
+
+    /**
+     * Called when a network error occurs before the HTTP status and response
+     * headers are received.  Client-side throttling uses this to collect
+     * metrics.
+     *
+     * @param event The connection, operation, and request state.
+     */
+    @Override
+    public void eventOccurred(ErrorReceivingResponseEvent event) {
+      singleton.errorReceivingResponse(event);
+    }
+  }
+
+  /**
+   * The SendingRequestEvent is fired before the Azure Storage SDK sends a
+   * request.
+   */
+  @InterfaceAudience.Private
+  static class SendingRequestEventHandler
+      extends StorageEvent<SendingRequestEvent> {
+
+    /**
+     * Called before the Azure Storage SDK sends a request. Client-side
+     * throttling uses this to suspend the request, if necessary, to minimize
+     * errors and maximize throughput.
+     *
+     * @param event The connection, operation, and request state.
+     */
+    @Override
+    public void eventOccurred(SendingRequestEvent event) {
+      singleton.sendingRequest(event);
+    }
+  }
+
+  /**
+   * The ResponseReceivedEvent is fired after the Azure Storage SDK receives a
+   * response.
+   */
+  @InterfaceAudience.Private
+  static class ResponseReceivedEventHandler
+      extends StorageEvent<ResponseReceivedEvent> {
+
+    /**
+     * Called after the Azure Storage SDK receives a response. Client-side
+     * throttling uses this
+     * to collect metrics.
+     *
+     * @param event The connection, operation, and request state.
+     */
+    @Override
+    public void eventOccurred(ResponseReceivedEvent event) {
+      singleton.responseReceived(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
index 51867cd..d04a19c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
@@ -67,4 +67,8 @@ public abstract class AbstractWasbTestBase {
 
   protected abstract AzureBlobStorageTestAccount createTestAccount()
       throws Exception;
+
+  protected AzureBlobStorageTestAccount getTestAccount() {
+    return testAccount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
new file mode 100644
index 0000000..07d4ebc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResponseReceivedEvent;
+import com.microsoft.azure.storage.SendingRequestEvent;
+import com.microsoft.azure.storage.StorageEvent;
+import com.microsoft.azure.storage.blob.BlobInputStream;
+import com.microsoft.azure.storage.blob.BlobOutputStream;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.junit.Test;
+
+import java.net.HttpURLConnection;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for <code>BlobOperationDescriptor</code>.
+ */
+public class TestBlobOperationDescriptor extends AbstractWasbTestBase {
+  private BlobOperationDescriptor.OperationType lastOperationTypeReceived;
+  private BlobOperationDescriptor.OperationType lastOperationTypeSent;
+  private long lastContentLengthReceived;
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  @Test
+  public void testAppendBlockOperations() throws Exception {
+    CloudBlobContainer container = getTestAccount().getRealContainer();
+
+    OperationContext context = new OperationContext();
+    context.getResponseReceivedEventHandler().addListener(
+        new ResponseReceivedEventHandler());
+    context.getSendingRequestEventHandler().addListener(
+        new SendingRequestEventHandler());
+
+    CloudAppendBlob appendBlob = container.getAppendBlobReference(
+        "testAppendBlockOperations");
+    assertNull(lastOperationTypeSent);
+    assertNull(lastOperationTypeReceived);
+    assertEquals(0, lastContentLengthReceived);
+
+    try (
+        BlobOutputStream output
+            = appendBlob.openWriteNew(null, null, context);
+    ) {
+      assertEquals(BlobOperationDescriptor.OperationType.CreateBlob,
+          lastOperationTypeReceived);
+      assertEquals(0, lastContentLengthReceived);
+
+      String message = "this is a test";
+      output.write(message.getBytes("UTF-8"));
+      output.flush();
+      assertEquals(BlobOperationDescriptor.OperationType.AppendBlock,
+          lastOperationTypeSent);
+      assertEquals(BlobOperationDescriptor.OperationType.AppendBlock,
+          lastOperationTypeReceived);
+      assertEquals(message.length(), lastContentLengthReceived);
+    }
+  }
+
+  @Test
+  public void testPutBlockOperations() throws Exception {
+    CloudBlobContainer container = getTestAccount().getRealContainer();
+
+    OperationContext context = new OperationContext();
+    context.getResponseReceivedEventHandler().addListener(
+        new ResponseReceivedEventHandler());
+    context.getSendingRequestEventHandler().addListener(
+        new SendingRequestEventHandler());
+
+    CloudBlockBlob blockBlob = container.getBlockBlobReference(
+        "testPutBlockOperations");
+    assertNull(lastOperationTypeSent);
+    assertNull(lastOperationTypeReceived);
+    assertEquals(0, lastContentLengthReceived);
+
+    try (
+        BlobOutputStream output
+            = blockBlob.openOutputStream(null,
+            null,
+            context);
+    ) {
+      assertNull(lastOperationTypeReceived);
+      assertEquals(0, lastContentLengthReceived);
+
+      String message = "this is a test";
+      output.write(message.getBytes("UTF-8"));
+      output.flush();
+      assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+          lastOperationTypeSent);
+      assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+          lastOperationTypeReceived);
+      assertEquals(message.length(), lastContentLengthReceived);
+    }
+    assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+        lastOperationTypeSent);
+    assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+        lastOperationTypeReceived);
+    assertEquals(0, lastContentLengthReceived);
+  }
+
+  @Test
+  public void testPutPageOperations() throws Exception {
+    CloudBlobContainer container = getTestAccount().getRealContainer();
+
+    OperationContext context = new OperationContext();
+    context.getResponseReceivedEventHandler().addListener(
+        new ResponseReceivedEventHandler());
+    context.getSendingRequestEventHandler().addListener(
+        new SendingRequestEventHandler());
+
+    CloudPageBlob pageBlob = container.getPageBlobReference(
+        "testPutPageOperations");
+    assertNull(lastOperationTypeSent);
+    assertNull(lastOperationTypeReceived);
+    assertEquals(0, lastContentLengthReceived);
+
+    try (
+        BlobOutputStream output = pageBlob.openWriteNew(1024,
+            null,
+            null,
+            context);
+    ) {
+      assertEquals(BlobOperationDescriptor.OperationType.CreateBlob,
+          lastOperationTypeReceived);
+      assertEquals(0, lastContentLengthReceived);
+
+      final int pageSize = 512;
+      byte[] buffer = new byte[pageSize];
+      output.write(buffer);
+      output.flush();
+      assertEquals(BlobOperationDescriptor.OperationType.PutPage,
+          lastOperationTypeSent);
+      assertEquals(BlobOperationDescriptor.OperationType.PutPage,
+          lastOperationTypeReceived);
+      assertEquals(buffer.length, lastContentLengthReceived);
+    }
+  }
+
+  @Test
+  public void testGetBlobOperations() throws Exception {
+    CloudBlobContainer container = getTestAccount().getRealContainer();
+
+    OperationContext context = new OperationContext();
+    context.getResponseReceivedEventHandler().addListener(
+        new ResponseReceivedEventHandler());
+    context.getSendingRequestEventHandler().addListener(
+        new SendingRequestEventHandler());
+
+    CloudBlockBlob blockBlob = container.getBlockBlobReference(
+        "testGetBlobOperations");
+    assertNull(lastOperationTypeSent);
+    assertNull(lastOperationTypeReceived);
+    assertEquals(0, lastContentLengthReceived);
+
+    String message = "this is a test";
+
+    try (
+        BlobOutputStream output = blockBlob.openOutputStream(null,
+            null,
+            context);
+    ) {
+      assertNull(lastOperationTypeReceived);
+      assertEquals(0, lastContentLengthReceived);
+
+      output.write(message.getBytes("UTF-8"));
+      output.flush();
+      assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+          lastOperationTypeSent);
+      assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+          lastOperationTypeReceived);
+      assertEquals(message.length(), lastContentLengthReceived);
+    }
+    assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+        lastOperationTypeSent);
+    assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+        lastOperationTypeReceived);
+    assertEquals(0, lastContentLengthReceived);
+
+    try (
+        BlobInputStream input = blockBlob.openInputStream(null,
+            null,
+            context);
+    ) {
+      assertEquals(BlobOperationDescriptor.OperationType.GetProperties,
+          lastOperationTypeSent);
+      assertEquals(BlobOperationDescriptor.OperationType.GetProperties,
+          lastOperationTypeReceived);
+      assertEquals(0, lastContentLengthReceived);
+
+      byte[] buffer = new byte[1024];
+      int numBytesRead = input.read(buffer);
+      assertEquals(BlobOperationDescriptor.OperationType.GetBlob,
+          lastOperationTypeSent);
+      assertEquals(BlobOperationDescriptor.OperationType.GetBlob,
+          lastOperationTypeReceived);
+      assertEquals(message.length(), lastContentLengthReceived);
+      assertEquals(numBytesRead, lastContentLengthReceived);
+    }
+  }
+
+  /**
+   * Called after the Azure Storage SDK receives a response.
+   *
+   * @param event The connection, operation, and request state.
+   */
+  private void responseReceived(ResponseReceivedEvent event) {
+    HttpURLConnection conn = (HttpURLConnection) event.getConnectionObject();
+    BlobOperationDescriptor.OperationType operationType
+        = BlobOperationDescriptor.getOperationType(conn);
+    lastOperationTypeReceived = operationType;
+
+    switch (operationType) {
+      case AppendBlock:
+      case PutBlock:
+      case PutPage:
+        lastContentLengthReceived
+            = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+            operationType);
+        break;
+      case GetBlob:
+        lastContentLengthReceived
+            = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+            operationType);
+        break;
+      default:
+        lastContentLengthReceived = 0;
+        break;
+    }
+  }
+
+  /**
+   * Called before the Azure Storage SDK sends a request.
+   *
+   * @param event The connection, operation, and request state.
+   */
+  private void sendingRequest(SendingRequestEvent event) {
+    this.lastOperationTypeSent
+        = BlobOperationDescriptor.getOperationType(
+            (HttpURLConnection) event.getConnectionObject());
+  }
+
+  /**
+   * The ResponseReceivedEvent is fired after the Azure Storage SDK receives a
+   * response.
+   */
+  @InterfaceAudience.Private
+  class ResponseReceivedEventHandler
+      extends StorageEvent<ResponseReceivedEvent> {
+
+    /**
+     * Called after the Azure Storage SDK receives a response.
+     *
+     * @param event The connection, operation, and request state.
+     */
+    @Override
+    public void eventOccurred(ResponseReceivedEvent event) {
+      responseReceived(event);
+    }
+  }
+
+  /**
+   * The SendingRequestEvent is fired before the Azure Storage SDK sends a
+   * request.
+   */
+  @InterfaceAudience.Private
+  class SendingRequestEventHandler extends StorageEvent<SendingRequestEvent> {
+
+    /**
+     * Called before the Azure Storage SDK sends a request.
+     *
+     * @param event The connection, operation, and request state.
+     */
+    @Override
+    public void eventOccurred(SendingRequestEvent event) {
+      sendingRequest(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..307e5af
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for <code>ClientThrottlingAnalyzer</code>.
+ */
+public class TestClientThrottlingAnalyzer {
+  private static final int ANALYSIS_PERIOD = 1000;
+  private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+      + ANALYSIS_PERIOD / 10;
+  private static final long MEGABYTE = 1024 * 1024;
+  private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
+
+  private void sleep(long milliseconds) {
+    try {
+      Thread.sleep(milliseconds);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void fuzzyValidate(long expected, long actual, double percentage) {
+    final double lowerBound = Math.max(expected - percentage / 100 * expected, 0);
+    final double upperBound = expected + percentage / 100 * expected;
+
+    assertTrue(
+        String.format(
+            "The actual value %1$d is not within the expected range: "
+                + "[%2$.2f, %3$.2f].",
+            actual,
+            lowerBound,
+            upperBound),
+        actual >= lowerBound && actual <= upperBound);
+  }
+
+  private void validate(long expected, long actual) {
+    assertEquals(
+        String.format("The actual value %1$d is not the expected value %2$d.",
+            actual,
+            expected),
+        expected, actual);
+  }
+
+  private void validateLessThanOrEqual(long maxExpected, long actual) {
+    assertTrue(
+        String.format(
+            "The actual value %1$d is not less than or equal to the maximum"
+            + " expected value %2$d.",
+            actual,
+            maxExpected),
+        actual < maxExpected);
+  }
+
+  /**
+   * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+   * never been updated.  This validates proper initialization of
+   * ClientThrottlingAnalyzer.
+   */
+  @Test
+  public void testNoMetricUpdatesThenNoWaiting() {
+    ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    validate(0, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+   * only been updated with successful requests.
+   */
+  @Test
+  public void testOnlySuccessThenNoWaiting() {
+    ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    analyzer.addBytesTransferred(8 * MEGABYTE, false);
+    validate(0, analyzer.getSleepDuration());
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    validate(0, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+   * only been updated with failed requests.  Also ensure that the
+   * sleepDuration decreases over time.
+   */
+  @Test
+  public void testOnlyErrorsAndWaiting() {
+    ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    analyzer.addBytesTransferred(4 * MEGABYTE, true);
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    final int expectedSleepDuration1 = 1100;
+    validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration());
+    sleep(10 * ANALYSIS_PERIOD);
+    final int expectedSleepDuration2 = 900;
+    validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+   * only been updated with both successful and failed requests.  Also ensure
+   * that the sleepDuration decreases over time.
+   */
+  @Test
+  public void testSuccessAndErrorsAndWaiting() {
+    ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    analyzer.addBytesTransferred(8 * MEGABYTE, false);
+    analyzer.addBytesTransferred(2 * MEGABYTE, true);
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    NanoTimer timer = new NanoTimer();
+    analyzer.suspendIfNecessary();
+    final int expectedElapsedTime = 126;
+    fuzzyValidate(expectedElapsedTime,
+                  timer.elapsedTimeMs(),
+                  MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+    sleep(10 * ANALYSIS_PERIOD);
+    final int expectedSleepDuration = 110;
+    validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration());
+  }
+
+  /**
+   * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+   * only been updated with many successful and failed requests.  Also ensure
+   * that the sleepDuration decreases to zero over time.
+   */
+  @Test
+  public void testManySuccessAndErrorsAndWaiting() {
+    ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+        "test",
+        ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+    final int numberOfRequests = 20;
+    for (int i = 0; i < numberOfRequests; i++) {
+      analyzer.addBytesTransferred(8 * MEGABYTE, false);
+      analyzer.addBytesTransferred(2 * MEGABYTE, true);
+    }
+    sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+    NanoTimer timer = new NanoTimer();
+    analyzer.suspendIfNecessary();
+    fuzzyValidate(7,
+                  timer.elapsedTimeMs(),
+                  MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+    sleep(10 * ANALYSIS_PERIOD);
+    validate(0, analyzer.getSleepDuration());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org