You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/08/01 20:37:18 UTC
hadoop git commit: HADOOP-14660 wasb: improve throughput by 34% when
account limit exceeded. Contributed by Thomas Marquardt.
Repository: hadoop
Updated Branches:
refs/heads/trunk 777475983 -> 778d4edd9
HADOOP-14660 wasb: improve throughput by 34% when account limit exceeded.
Contributed by Thomas Marquardt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/778d4edd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/778d4edd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/778d4edd
Branch: refs/heads/trunk
Commit: 778d4edd9adbe9519c3d6df65e45ddc8bb0ab2da
Parents: 7774759
Author: Steve Loughran <st...@apache.org>
Authored: Tue Aug 1 21:33:52 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Aug 1 21:36:28 2017 +0100
----------------------------------------------------------------------
.../hadoop/fs/contract/ContractTestUtils.java | 10 +-
.../fs/azure/AzureNativeFileSystemStore.java | 102 ++++---
.../fs/azure/BlobOperationDescriptor.java | 222 ++++++++++++++
.../fs/azure/ClientThrottlingAnalyzer.java | 284 +++++++++++++++++
.../fs/azure/ClientThrottlingIntercept.java | 221 ++++++++++++++
.../hadoop/fs/azure/AbstractWasbTestBase.java | 4 +
.../fs/azure/TestBlobOperationDescriptor.java | 305 +++++++++++++++++++
.../fs/azure/TestClientThrottlingAnalyzer.java | 177 +++++++++++
8 files changed, 1282 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index c66dabf..4d9495e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1524,7 +1524,7 @@ public class ContractTestUtils extends Assert {
* printing some useful results in the process.
*/
public static final class NanoTimer {
- private final long startTime;
+ private long startTime;
private long endTime;
public NanoTimer() {
@@ -1532,6 +1532,14 @@ public class ContractTestUtils extends Assert {
}
/**
+ * Reset the timer. Equivalent to the reset button of a stopwatch.
+ */
+ public void reset() {
+ endTime = 0;
+ startTime = now();
+ }
+
+ /**
* End the operation.
* @return the duration of the operation
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 7c198af..554027b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -158,6 +158,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor";
+ private static final String KEY_AUTO_THROTTLE_ENABLE = "fs.azure.autothrottling.enable";
+
private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging";
/**
@@ -239,10 +241,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Retry parameter defaults.
//
- private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
+ private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 1s
private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
- private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
- private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
+ private static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 1s
+ private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3 * 1000;
private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000;
@@ -256,6 +258,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final float DEFAULT_SELF_THROTTLE_READ_FACTOR = 1.0f;
private static final float DEFAULT_SELF_THROTTLE_WRITE_FACTOR = 1.0f;
+ private static final boolean DEFAULT_AUTO_THROTTLE_ENABLE = false;
+
private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
/**
@@ -283,7 +287,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean connectingUsingSAS = false;
private AzureFileSystemInstrumentation instrumentation;
private BandwidthGaugeUpdater bandwidthGaugeUpdater;
- private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
+ private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
private boolean suppressRetryPolicy = false;
private boolean canCreateOrModifyContainer = false;
@@ -308,6 +312,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private float selfThrottlingReadFactor;
private float selfThrottlingWriteFactor;
+ private boolean autoThrottlingEnabled;
+
private TestHookOperationContext testHookOperationContext = null;
// Set if we're running against a storage emulator..
@@ -481,7 +487,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
"Cannot initialize WASB file system, conf is null");
}
- if(!conf.getBoolean(
+ if (!conf.getBoolean(
NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
//If not skip azure metrics, create bandwidthGaugeUpdater
this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation);
@@ -664,9 +670,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private String getHTTPScheme() {
String sessionScheme = sessionUri.getScheme();
// Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
- if (sessionScheme != null &&
- (sessionScheme.equalsIgnoreCase("asvs") ||
- sessionScheme.equalsIgnoreCase("wasbs"))) {
+ if (sessionScheme != null
+ && (sessionScheme.equalsIgnoreCase("asvs")
+ || sessionScheme.equalsIgnoreCase("wasbs"))) {
return HTTPS_SCHEME;
} else {
// At this point the scheme should be either null or asv or wasb.
@@ -766,6 +772,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
selfThrottlingWriteFactor = sessionConfiguration.getFloat(
KEY_SELF_THROTTLE_WRITE_FACTOR, DEFAULT_SELF_THROTTLE_WRITE_FACTOR);
+ if (!selfThrottlingEnabled) {
+ autoThrottlingEnabled = sessionConfiguration.getBoolean(
+ KEY_AUTO_THROTTLE_ENABLE,
+ DEFAULT_AUTO_THROTTLE_ENABLE);
+ if (autoThrottlingEnabled) {
+ ClientThrottlingIntercept.initializeSingleton();
+ }
+ } else {
+ // cannot enable both self-throttling and client-throttling
+ autoThrottlingEnabled = false;
+ }
+
OperationContext.setLoggingEnabledByDefault(sessionConfiguration.
getBoolean(KEY_ENABLE_STORAGE_CLIENT_LOGGING, false));
@@ -839,8 +857,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
CloudStorageAccount.getDevelopmentStorageAccount();
storageInteractionLayer.createBlobClient(account);
} else {
- blobEndPoint = new URI(getHTTPScheme() + "://" +
- accountName);
+ blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
}
suppressRetryPolicyInClientIfNeeded();
@@ -951,7 +968,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* @throws AzureException
* @throws IOException
*/
- private void createAzureStorageSession ()
+ private void createAzureStorageSession()
throws AzureException, IOException {
// Make sure this object was properly initialized with references to
@@ -1128,8 +1145,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
myDir = verifyAndConvertToStandardFormat(currentDir);
} catch (URISyntaxException ex) {
throw new AzureException(String.format(
- "The directory %s specified in the configuration entry %s is not" +
- " a valid URI.",
+ "The directory %s specified in the configuration entry %s is not"
+ + " a valid URI.",
currentDir, configVar));
}
if (myDir != null) {
@@ -1159,8 +1176,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
for (String dir : dirSet) {
- if (dir.isEmpty() ||
- key.startsWith(dir + "/")) {
+ if (dir.isEmpty() || key.startsWith(dir + "/")) {
return true;
}
@@ -1168,7 +1184,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// system.
//
try {
- URI uriPageBlobDir = new URI (dir);
+ URI uriPageBlobDir = new URI(dir);
if (null == uriPageBlobDir.getAuthority()) {
// Concatenate the default file system prefix with the relative
// page blob directory path.
@@ -1424,7 +1440,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
throws StorageException {
if (blob instanceof CloudPageBlobWrapper){
return new PageBlobOutputStream(
- (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
+ (CloudPageBlobWrapper) blob, getInstrumentedContext(), sessionConfiguration);
} else {
// Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
@@ -1739,12 +1755,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
boolean useFlatBlobListing) throws StorageException, URISyntaxException {
return rootDirectory.listBlobs(
- null, useFlatBlobListing,
- includeMetadata ?
- EnumSet.of(BlobListingDetails.METADATA) :
- EnumSet.noneOf(BlobListingDetails.class),
null,
- getInstrumentedContext());
+ useFlatBlobListing,
+ includeMetadata
+ ? EnumSet.of(BlobListingDetails.METADATA)
+ : EnumSet.noneOf(BlobListingDetails.class),
+ null,
+ getInstrumentedContext());
}
/**
@@ -1771,11 +1788,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix,
useFlatBlobListing,
- includeMetadata ?
- EnumSet.of(BlobListingDetails.METADATA) :
- EnumSet.noneOf(BlobListingDetails.class),
- null,
- getInstrumentedContext());
+ includeMetadata
+ ? EnumSet.of(BlobListingDetails.METADATA)
+ : EnumSet.noneOf(BlobListingDetails.class),
+ null,
+ getInstrumentedContext());
return list;
}
@@ -1941,9 +1958,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
if (selfThrottlingEnabled) {
SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor,
selfThrottlingWriteFactor);
+ } else if (autoThrottlingEnabled) {
+ ClientThrottlingIntercept.hook(operationContext);
}
- if(bandwidthGaugeUpdater != null) {
+ if (bandwidthGaugeUpdater != null) {
//bandwidthGaugeUpdater is null when we config to skip azure metrics
ResponseReceivedMetricUpdater.hook(
operationContext,
@@ -2446,10 +2465,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// 1. It's a BlobNotFound exception AND
// 2. It got there after one-or-more retries THEN
// we swallow the exception.
- if (e.getErrorCode() != null &&
- "BlobNotFound".equals(e.getErrorCode()) &&
- operationContext.getRequestResults().size() > 1 &&
- operationContext.getRequestResults().get(0).getException() != null) {
+ if (e.getErrorCode() != null
+ && "BlobNotFound".equals(e.getErrorCode())
+ && operationContext.getRequestResults().size() > 1
+ && operationContext.getRequestResults().get(0).getException() != null) {
LOG.debug("Swallowing delete exception on retry: {}", e.getMessage());
return;
} else {
@@ -2496,7 +2515,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return delete(key, null);
} catch (IOException e) {
Throwable t = e.getCause();
- if(t != null && t instanceof StorageException) {
+ if (t != null && t instanceof StorageException) {
StorageException se = (StorageException) t;
if ("LeaseIdMissing".equals(se.getErrorCode())){
SelfRenewingLease lease = null;
@@ -2509,7 +2528,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
throw e3;
} finally {
try {
- if(lease != null){
+ if (lease != null){
lease.free();
}
} catch (Exception e4){
@@ -2561,8 +2580,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
srcBlob = getBlobReference(srcKey);
if (!srcBlob.exists(getInstrumentedContext())) {
- throw new AzureException ("Source blob " + srcKey +
- " does not exist.");
+ throw new AzureException("Source blob " + srcKey + " does not exist.");
}
/**
@@ -2600,19 +2618,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
int copyBlobMinBackoff = sessionConfiguration.getInt(
KEY_COPYBLOB_MIN_BACKOFF_INTERVAL,
- DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
+ DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
int copyBlobMaxBackoff = sessionConfiguration.getInt(
KEY_COPYBLOB_MAX_BACKOFF_INTERVAL,
- DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL);
+ DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL);
int copyBlobDeltaBackoff = sessionConfiguration.getInt(
KEY_COPYBLOB_BACKOFF_INTERVAL,
- DEFAULT_COPYBLOB_BACKOFF_INTERVAL);
+ DEFAULT_COPYBLOB_BACKOFF_INTERVAL);
int copyBlobMaxRetries = sessionConfiguration.getInt(
KEY_COPYBLOB_MAX_IO_RETRIES,
- DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS);
+ DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS);
BlobRequestOptions options = new BlobRequestOptions();
options.setRetryPolicyFactory(new RetryExponentialRetry(
@@ -2631,7 +2649,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
InputStream ipStream = null;
OutputStream opStream = null;
try {
- if(srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
+ if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
ipStream = openInputStream(srcBlob);
opStream = openOutputStream(dstBlob);
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
@@ -2817,7 +2835,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void close() {
- if(bandwidthGaugeUpdater != null) {
+ if (bandwidthGaugeUpdater != null) {
bandwidthGaugeUpdater.close();
bandwidthGaugeUpdater = null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java
new file mode 100644
index 0000000..6da64e1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.Constants.HeaderConstants;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Determines the operation type (PutBlock, PutPage, GetBlob, etc) of Azure
+ * Storage operations. This is used by the handlers of the SendingRequestEvent
+ * and ResponseReceivedEvent exposed by the Azure Storage SDK to identify
+ * operation types (since the type of operation is not exposed by the SDK).
+ */
+@InterfaceAudience.Private
+final class BlobOperationDescriptor {
+
+ private BlobOperationDescriptor() {
+ // hide default constructor
+ }
+
+ /**
+ * Gets the content length for the Azure Storage operation from the
+ * 'x-ms-range' header, if set.
+ * @param range the value of the 'x-ms-range' header.
+ * @return the content length, or zero if not set.
+ */
+ private static long getContentLengthIfKnown(String range) {
+ long contentLength = 0;
+ // Format is "bytes=%d-%d"
+ if (range != null && range.startsWith("bytes=")) {
+ String[] offsets = range.substring("bytes=".length()).split("-");
+ if (offsets.length == 2) {
+ contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0])
+ + 1;
+ }
+ }
+ return contentLength;
+ }
+
+ /**
+ * Gets the content length for the Azure Storage operation, or returns zero if
+ * unknown.
+ * @param conn the connection object for the Azure Storage operation.
+ * @param operationType the Azure Storage operation type.
+ * @return the content length, or zero if unknown.
+ */
+ static long getContentLengthIfKnown(HttpURLConnection conn,
+ OperationType operationType) {
+ long contentLength = 0;
+ switch (operationType) {
+ case AppendBlock:
+ case PutBlock:
+ String lengthString = conn.getRequestProperty(
+ HeaderConstants.CONTENT_LENGTH);
+ contentLength = (lengthString != null)
+ ? Long.parseLong(lengthString)
+ : 0;
+ break;
+ case PutPage:
+ case GetBlob:
+ contentLength = BlobOperationDescriptor.getContentLengthIfKnown(
+ conn.getRequestProperty("x-ms-range"));
+ break;
+ default:
+ break;
+ }
+ return contentLength;
+ }
+
+ /**
+ * Gets the operation type of an Azure Storage operation.
+ *
+ * @param conn the connection object for the Azure Storage operation.
+ * @return the operation type.
+ */
+ static OperationType getOperationType(HttpURLConnection conn) {
+ OperationType operationType = OperationType.Unknown;
+ String method = conn.getRequestMethod();
+ String compValue = getQueryParameter(conn.getURL(),
+ "comp");
+
+ if (method.equalsIgnoreCase("PUT")) {
+ if (compValue != null) {
+ switch (compValue) {
+ case "metadata":
+ operationType = OperationType.SetMetadata;
+ break;
+ case "properties":
+ operationType = OperationType.SetProperties;
+ break;
+ case "block":
+ operationType = OperationType.PutBlock;
+ break;
+ case "page":
+ String pageWrite = conn.getRequestProperty("x-ms-page-write");
+ if (pageWrite != null && pageWrite.equalsIgnoreCase(
+ "UPDATE")) {
+ operationType = OperationType.PutPage;
+ }
+ break;
+ case "appendblock":
+ operationType = OperationType.AppendBlock;
+ break;
+ case "blocklist":
+ operationType = OperationType.PutBlockList;
+ break;
+ default:
+ break;
+ }
+ } else {
+ String blobType = conn.getRequestProperty("x-ms-blob-type");
+ if (blobType != null
+ && (blobType.equalsIgnoreCase("PageBlob")
+ || blobType.equalsIgnoreCase("BlockBlob")
+ || blobType.equalsIgnoreCase("AppendBlob"))) {
+ operationType = OperationType.CreateBlob;
+ } else if (blobType == null) {
+ String resType = getQueryParameter(conn.getURL(),
+ "restype");
+ if (resType != null
+ && resType.equalsIgnoreCase("container")) {
+ operationType = operationType.CreateContainer;
+ }
+ }
+ }
+ } else if (method.equalsIgnoreCase("GET")) {
+ if (compValue != null) {
+ switch (compValue) {
+ case "list":
+ operationType = OperationType.ListBlobs;
+ break;
+
+ case "metadata":
+ operationType = OperationType.GetMetadata;
+ break;
+ case "blocklist":
+ operationType = OperationType.GetBlockList;
+ break;
+ case "pagelist":
+ operationType = OperationType.GetPageList;
+ break;
+ default:
+ break;
+ }
+ } else if (conn.getRequestProperty("x-ms-range") != null) {
+ operationType = OperationType.GetBlob;
+ }
+ } else if (method.equalsIgnoreCase("HEAD")) {
+ operationType = OperationType.GetProperties;
+ } else if (method.equalsIgnoreCase("DELETE")) {
+ String resType = getQueryParameter(conn.getURL(),
+ "restype");
+ if (resType != null
+ && resType.equalsIgnoreCase("container")) {
+ operationType = operationType.DeleteContainer;
+ } else {
+ operationType = OperationType.DeleteBlob;
+ }
+ }
+ return operationType;
+ }
+
+ private static String getQueryParameter(URL url, String queryParameterName) {
+ String query = (url != null) ? url.getQuery(): null;
+
+ if (query == null) {
+ return null;
+ }
+
+ String searchValue = queryParameterName + "=";
+
+ int offset = query.indexOf(searchValue);
+ String value = null;
+ if (offset != -1) {
+ int beginIndex = offset + searchValue.length();
+ int endIndex = query.indexOf('&', beginIndex);
+ value = (endIndex == -1)
+ ? query.substring(beginIndex)
+ : query.substring(beginIndex, endIndex);
+ }
+ return value;
+ }
+
+ @InterfaceAudience.Private
+ enum OperationType {
+ AppendBlock,
+ CreateBlob,
+ CreateContainer,
+ DeleteBlob,
+ DeleteContainer,
+ GetBlob,
+ GetBlockList,
+ GetMetadata,
+ GetPageList,
+ GetProperties,
+ ListBlobs,
+ PutBlock,
+ PutBlockList,
+ PutPage,
+ SetMetadata,
+ SetProperties,
+ Unknown
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..aa7ac2e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Throttles storage operations to minimize errors and maximum throughput. This
+ * improves throughput by as much as 35% when the service throttles requests due
+ * to exceeding account level ingress or egress limits.
+ */
+@InterfaceAudience.Private
+class ClientThrottlingAnalyzer {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ClientThrottlingAnalyzer.class);
+ private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
+ private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
+ private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
+ private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
+ private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
+ private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
+ private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
+ * 1000;
+ private static final double SLEEP_DECREASE_FACTOR = .975;
+ private static final double SLEEP_INCREASE_FACTOR = 1.05;
+ private int analysisPeriodMs;
+
+ private volatile int sleepDuration = 0;
+ private long consecutiveNoErrorCount = 0;
+ private String name = null;
+ private Timer timer = null;
+ private AtomicReference<BlobOperationMetrics> blobMetrics = null;
+
+ private ClientThrottlingAnalyzer() {
+ // hide default constructor
+ }
+
+ /**
+ * Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
+ * the specified name.
+ *
+ * @param name a name used to identify this instance.
+ *
+ * @throws IllegalArgumentException if name is null or empty.
+ */
+ ClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
+ this(name, DEFAULT_ANALYSIS_PERIOD_MS);
+ }
+
+ /**
+ * Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
+ * the specified name and period.
+ *
+ * @param name A name used to identify this instance.
+ *
+ * @param period The frequency, in milliseconds, at which metrics are
+ * analyzed.
+ *
+ * @throws IllegalArgumentException
+ * If name is null or empty.
+ * If period is less than 1000 or greater than 30000 milliseconds.
+ */
+ ClientThrottlingAnalyzer(String name, int period)
+ throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ StringUtils.isNotEmpty(name),
+ "The argument 'name' cannot be null or empty.");
+ Preconditions.checkArgument(
+ period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
+ "The argument 'period' must be between 1000 and 30000.");
+ this.name = name;
+ this.analysisPeriodMs = period;
+ this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
+ new BlobOperationMetrics(System.currentTimeMillis()));
+ this.timer = new Timer(
+ String.format("wasb-timer-client-throttling-analyzer-%s", name));
+ this.timer.schedule(new TimerTaskImpl(),
+ analysisPeriodMs,
+ analysisPeriodMs);
+ }
+
+ /**
+ * Updates metrics with results from the current storage operation.
+ *
+ * @param count The count of bytes transferred.
+ *
+ * @param isFailedOperation True if the operation failed; otherwise false.
+ */
+ public void addBytesTransferred(long count, boolean isFailedOperation) {
+ BlobOperationMetrics metrics = blobMetrics.get();
+ if (isFailedOperation) {
+ metrics.bytesFailed.addAndGet(count);
+ metrics.operationsFailed.incrementAndGet();
+ } else {
+ metrics.bytesSuccessful.addAndGet(count);
+ metrics.operationsSuccessful.incrementAndGet();
+ }
+ }
+
+ /**
+ * Suspends the current storage operation, as necessary, to reduce throughput.
+ */
+ public void suspendIfNecessary() {
+ int duration = sleepDuration;
+ if (duration > 0) {
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ int getSleepDuration() {
+ return sleepDuration;
+ }
+
+ private int analyzeMetricsAndUpdateSleepDuration(BlobOperationMetrics metrics,
+ int sleepDuration) {
+ final double percentageConversionFactor = 100;
+ double bytesFailed = metrics.bytesFailed.get();
+ double bytesSuccessful = metrics.bytesSuccessful.get();
+ double operationsFailed = metrics.operationsFailed.get();
+ double operationsSuccessful = metrics.operationsSuccessful.get();
+ double errorPercentage = (bytesFailed <= 0)
+ ? 0
+ : percentageConversionFactor
+ * bytesFailed
+ / (bytesFailed + bytesSuccessful);
+ long periodMs = metrics.endTime - metrics.startTime;
+
+ double newSleepDuration;
+
+ if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
+ ++consecutiveNoErrorCount;
+ // Decrease sleepDuration in order to increase throughput.
+ double reductionFactor =
+ (consecutiveNoErrorCount * analysisPeriodMs
+ >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
+ ? RAPID_SLEEP_DECREASE_FACTOR
+ : SLEEP_DECREASE_FACTOR;
+
+ newSleepDuration = sleepDuration * reductionFactor;
+ } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
+ // Do not modify sleepDuration in order to stabilize throughput.
+ newSleepDuration = sleepDuration;
+ } else {
+ // Increase sleepDuration in order to minimize error rate.
+ consecutiveNoErrorCount = 0;
+
+ // Increase sleep duration in order to reduce throughput and error rate.
+ // First, calculate target throughput: bytesSuccessful / periodMs.
+ // Next, calculate time required to send *all* data (assuming next period
+ // is similar to previous) at the target throughput: (bytesSuccessful
+ // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
+ // get the total additional delay needed.
+ double additionalDelayNeeded = 5 * analysisPeriodMs;
+ if (bytesSuccessful > 0) {
+ additionalDelayNeeded = (bytesSuccessful + bytesFailed)
+ * periodMs
+ / bytesSuccessful
+ - periodMs;
+ }
+
+ // amortize the additional delay needed across the estimated number of
+ // requests during the next period
+ newSleepDuration = additionalDelayNeeded
+ / (operationsFailed + operationsSuccessful);
+
+ final double maxSleepDuration = analysisPeriodMs;
+ final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
+
+ // Add 1 ms to avoid rounding down and to decrease proximity to the server
+ // side ingress/egress limit. Ensure that the new sleep duration is
+ // larger than the current one to more quickly reduce the number of
+ // errors. Don't allow the sleep duration to grow unbounded, after a
+ // certain point throttling won't help, for example, if there are far too
+ // many tasks/containers/nodes no amount of throttling will help.
+ newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
+ newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
+ name,
+ (int) bytesFailed,
+ (int) bytesSuccessful,
+ (int) operationsFailed,
+ (int) operationsSuccessful,
+ errorPercentage,
+ periodMs,
+ (int) sleepDuration,
+ (int) newSleepDuration));
+ }
+
+ return (int) newSleepDuration;
+ }
+
+ /**
+ * Timer callback implementation for periodically analyzing metrics.
+ */
+ class TimerTaskImpl extends TimerTask {
+ private AtomicInteger doingWork = new AtomicInteger(0);
+
+ /**
+ * Periodically analyzes a snapshot of the blob storage metrics and updates
+ * the sleepDuration in order to appropriately throttle storage operations.
+ */
+ @Override
+ public void run() {
+ boolean doWork = false;
+ try {
+ doWork = doingWork.compareAndSet(0, 1);
+
+ // prevent concurrent execution of this task
+ if (!doWork) {
+ return;
+ }
+
+ long now = System.currentTimeMillis();
+ if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+ BlobOperationMetrics oldMetrics = blobMetrics.getAndSet(
+ new BlobOperationMetrics(now));
+ oldMetrics.endTime = now;
+ sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
+ sleepDuration);
+ }
+ }
+ finally {
+ if (doWork) {
+ doingWork.set(0);
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores blob operation metrics during each analysis period.
+ */
+ static class BlobOperationMetrics {
+ private AtomicLong bytesFailed;
+ private AtomicLong bytesSuccessful;
+ private AtomicLong operationsFailed;
+ private AtomicLong operationsSuccessful;
+ private long endTime;
+ private long startTime;
+
+ BlobOperationMetrics(long startTime) {
+ this.startTime = startTime;
+ this.bytesFailed = new AtomicLong();
+ this.bytesSuccessful = new AtomicLong();
+ this.operationsFailed = new AtomicLong();
+ this.operationsSuccessful = new AtomicLong();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java
new file mode 100644
index 0000000..9da993b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.ErrorReceivingResponseEvent;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.RequestResult;
+import com.microsoft.azure.storage.ResponseReceivedEvent;
+import com.microsoft.azure.storage.SendingRequestEvent;
+import com.microsoft.azure.storage.StorageEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Throttles Azure Storage read and write operations to achieve maximum
+ * throughput by minimizing errors. The errors occur when the account ingress
+ * or egress limits are exceeded and the server-side throttles requests.
+ * Server-side throttling causes the retry policy to be used, but the retry
+ * policy sleeps for long periods of time causing the total ingress or egress
+ * throughput to be as much as 35% lower than optimal. The retry policy is also
+ * after the fact, in that it applies after a request fails. On the other hand,
+ * the client-side throttling implemented here happens before requests are made
+ * and sleeps just enough to minimize errors, allowing optimal ingress and/or
+ * egress throughput.
+ */
+@InterfaceAudience.Private
+final class ClientThrottlingIntercept {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ClientThrottlingIntercept.class);
+ private static ClientThrottlingIntercept singleton = null;
+ private ClientThrottlingAnalyzer readThrottler = null;
+ private ClientThrottlingAnalyzer writeThrottler = null;
+
+ // Hide default constructor
+ private ClientThrottlingIntercept() {
+ readThrottler = new ClientThrottlingAnalyzer("read");
+ writeThrottler = new ClientThrottlingAnalyzer("write");
+ LOG.debug("Client-side throttling is enabled for the WASB file system.");
+ }
+
+ static synchronized void initializeSingleton() {
+ if (singleton == null) {
+ singleton = new ClientThrottlingIntercept();
+ }
+ }
+
+ static void hook(OperationContext context) {
+ context.getErrorReceivingResponseEventHandler().addListener(
+ new ErrorReceivingResponseEventHandler());
+ context.getSendingRequestEventHandler().addListener(
+ new SendingRequestEventHandler());
+ context.getResponseReceivedEventHandler().addListener(
+ new ResponseReceivedEventHandler());
+ }
+
+ private static void updateMetrics(HttpURLConnection conn,
+ RequestResult result) {
+ BlobOperationDescriptor.OperationType operationType
+ = BlobOperationDescriptor.getOperationType(conn);
+ int status = result.getStatusCode();
+ long contentLength = 0;
+ // If the socket is terminated prior to receiving a response, the HTTP
+ // status may be 0 or -1. A status less than 200 or greater than or equal
+ // to 500 is considered an error.
+ boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
+ || status >= java.net.HttpURLConnection.HTTP_INTERNAL_ERROR);
+
+ switch (operationType) {
+ case AppendBlock:
+ case PutBlock:
+ case PutPage:
+ contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+ operationType);
+ if (contentLength > 0) {
+ singleton.writeThrottler.addBytesTransferred(contentLength,
+ isFailedOperation);
+ }
+ break;
+ case GetBlob:
+ contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+ operationType);
+ if (contentLength > 0) {
+ singleton.readThrottler.addBytesTransferred(contentLength,
+ isFailedOperation);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Called when a network error occurs before the HTTP status and response
+ * headers are received. Client-side throttling uses this to collect metrics.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ public static void errorReceivingResponse(ErrorReceivingResponseEvent event) {
+ updateMetrics((HttpURLConnection) event.getConnectionObject(),
+ event.getRequestResult());
+ }
+
+ /**
+ * Called before the Azure Storage SDK sends a request. Client-side throttling
+ * uses this to suspend the request, if necessary, to minimize errors and
+ * maximize throughput.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ public static void sendingRequest(SendingRequestEvent event) {
+ BlobOperationDescriptor.OperationType operationType
+ = BlobOperationDescriptor.getOperationType(
+ (HttpURLConnection) event.getConnectionObject());
+ switch (operationType) {
+ case GetBlob:
+ singleton.readThrottler.suspendIfNecessary();
+ break;
+ case AppendBlock:
+ case PutBlock:
+ case PutPage:
+ singleton.writeThrottler.suspendIfNecessary();
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Called after the Azure Storage SDK receives a response. Client-side
+ * throttling uses this to collect metrics.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ public static void responseReceived(ResponseReceivedEvent event) {
+ updateMetrics((HttpURLConnection) event.getConnectionObject(),
+ event.getRequestResult());
+ }
+
+ /**
+ * The ErrorReceivingResponseEvent is fired when the Azure Storage SDK
+ * encounters a network error before the HTTP status and response headers are
+ * received.
+ */
+ @InterfaceAudience.Private
+ static class ErrorReceivingResponseEventHandler
+ extends StorageEvent<ErrorReceivingResponseEvent> {
+
+ /**
+ * Called when a network error occurs before the HTTP status and response
+ * headers are received. Client-side throttling uses this to collect
+ * metrics.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ @Override
+ public void eventOccurred(ErrorReceivingResponseEvent event) {
+ singleton.errorReceivingResponse(event);
+ }
+ }
+
+ /**
+ * The SendingRequestEvent is fired before the Azure Storage SDK sends a
+ * request.
+ */
+ @InterfaceAudience.Private
+ static class SendingRequestEventHandler
+ extends StorageEvent<SendingRequestEvent> {
+
+ /**
+ * Called before the Azure Storage SDK sends a request. Client-side
+ * throttling uses this to suspend the request, if necessary, to minimize
+ * errors and maximize throughput.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ @Override
+ public void eventOccurred(SendingRequestEvent event) {
+ singleton.sendingRequest(event);
+ }
+ }
+
+ /**
+ * The ResponseReceivedEvent is fired after the Azure Storage SDK receives a
+ * response.
+ */
+ @InterfaceAudience.Private
+ static class ResponseReceivedEventHandler
+ extends StorageEvent<ResponseReceivedEvent> {
+
+ /**
+ * Called after the Azure Storage SDK receives a response. Client-side
+ * throttling uses this
+ * to collect metrics.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ @Override
+ public void eventOccurred(ResponseReceivedEvent event) {
+ singleton.responseReceived(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
index 51867cd..d04a19c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
@@ -67,4 +67,8 @@ public abstract class AbstractWasbTestBase {
protected abstract AzureBlobStorageTestAccount createTestAccount()
throws Exception;
+
+ protected AzureBlobStorageTestAccount getTestAccount() {
+ return testAccount;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
new file mode 100644
index 0000000..07d4ebc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResponseReceivedEvent;
+import com.microsoft.azure.storage.SendingRequestEvent;
+import com.microsoft.azure.storage.StorageEvent;
+import com.microsoft.azure.storage.blob.BlobInputStream;
+import com.microsoft.azure.storage.blob.BlobOutputStream;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.junit.Test;
+
+import java.net.HttpURLConnection;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for <code>BlobOperationDescriptor</code>.
+ */
+public class TestBlobOperationDescriptor extends AbstractWasbTestBase {
+ private BlobOperationDescriptor.OperationType lastOperationTypeReceived;
+ private BlobOperationDescriptor.OperationType lastOperationTypeSent;
+ private long lastContentLengthReceived;
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ @Test
+ public void testAppendBlockOperations() throws Exception {
+ CloudBlobContainer container = getTestAccount().getRealContainer();
+
+ OperationContext context = new OperationContext();
+ context.getResponseReceivedEventHandler().addListener(
+ new ResponseReceivedEventHandler());
+ context.getSendingRequestEventHandler().addListener(
+ new SendingRequestEventHandler());
+
+ CloudAppendBlob appendBlob = container.getAppendBlobReference(
+ "testAppendBlockOperations");
+ assertNull(lastOperationTypeSent);
+ assertNull(lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ try (
+ BlobOutputStream output
+ = appendBlob.openWriteNew(null, null, context);
+ ) {
+ assertEquals(BlobOperationDescriptor.OperationType.CreateBlob,
+ lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ String message = "this is a test";
+ output.write(message.getBytes("UTF-8"));
+ output.flush();
+ assertEquals(BlobOperationDescriptor.OperationType.AppendBlock,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.AppendBlock,
+ lastOperationTypeReceived);
+ assertEquals(message.length(), lastContentLengthReceived);
+ }
+ }
+
+ @Test
+ public void testPutBlockOperations() throws Exception {
+ CloudBlobContainer container = getTestAccount().getRealContainer();
+
+ OperationContext context = new OperationContext();
+ context.getResponseReceivedEventHandler().addListener(
+ new ResponseReceivedEventHandler());
+ context.getSendingRequestEventHandler().addListener(
+ new SendingRequestEventHandler());
+
+ CloudBlockBlob blockBlob = container.getBlockBlobReference(
+ "testPutBlockOperations");
+ assertNull(lastOperationTypeSent);
+ assertNull(lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ try (
+ BlobOutputStream output
+ = blockBlob.openOutputStream(null,
+ null,
+ context);
+ ) {
+ assertNull(lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ String message = "this is a test";
+ output.write(message.getBytes("UTF-8"));
+ output.flush();
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+ lastOperationTypeReceived);
+ assertEquals(message.length(), lastContentLengthReceived);
+ }
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+ lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+ }
+
+ @Test
+ public void testPutPageOperations() throws Exception {
+ CloudBlobContainer container = getTestAccount().getRealContainer();
+
+ OperationContext context = new OperationContext();
+ context.getResponseReceivedEventHandler().addListener(
+ new ResponseReceivedEventHandler());
+ context.getSendingRequestEventHandler().addListener(
+ new SendingRequestEventHandler());
+
+ CloudPageBlob pageBlob = container.getPageBlobReference(
+ "testPutPageOperations");
+ assertNull(lastOperationTypeSent);
+ assertNull(lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ try (
+ BlobOutputStream output = pageBlob.openWriteNew(1024,
+ null,
+ null,
+ context);
+ ) {
+ assertEquals(BlobOperationDescriptor.OperationType.CreateBlob,
+ lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ final int pageSize = 512;
+ byte[] buffer = new byte[pageSize];
+ output.write(buffer);
+ output.flush();
+ assertEquals(BlobOperationDescriptor.OperationType.PutPage,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.PutPage,
+ lastOperationTypeReceived);
+ assertEquals(buffer.length, lastContentLengthReceived);
+ }
+ }
+
+ @Test
+ public void testGetBlobOperations() throws Exception {
+ CloudBlobContainer container = getTestAccount().getRealContainer();
+
+ OperationContext context = new OperationContext();
+ context.getResponseReceivedEventHandler().addListener(
+ new ResponseReceivedEventHandler());
+ context.getSendingRequestEventHandler().addListener(
+ new SendingRequestEventHandler());
+
+ CloudBlockBlob blockBlob = container.getBlockBlobReference(
+ "testGetBlobOperations");
+ assertNull(lastOperationTypeSent);
+ assertNull(lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ String message = "this is a test";
+
+ try (
+ BlobOutputStream output = blockBlob.openOutputStream(null,
+ null,
+ context);
+ ) {
+ assertNull(lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ output.write(message.getBytes("UTF-8"));
+ output.flush();
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
+ lastOperationTypeReceived);
+ assertEquals(message.length(), lastContentLengthReceived);
+ }
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
+ lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ try (
+ BlobInputStream input = blockBlob.openInputStream(null,
+ null,
+ context);
+ ) {
+ assertEquals(BlobOperationDescriptor.OperationType.GetProperties,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.GetProperties,
+ lastOperationTypeReceived);
+ assertEquals(0, lastContentLengthReceived);
+
+ byte[] buffer = new byte[1024];
+ int numBytesRead = input.read(buffer);
+ assertEquals(BlobOperationDescriptor.OperationType.GetBlob,
+ lastOperationTypeSent);
+ assertEquals(BlobOperationDescriptor.OperationType.GetBlob,
+ lastOperationTypeReceived);
+ assertEquals(message.length(), lastContentLengthReceived);
+ assertEquals(numBytesRead, lastContentLengthReceived);
+ }
+ }
+
+ /**
+ * Called after the Azure Storage SDK receives a response.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ private void responseReceived(ResponseReceivedEvent event) {
+ HttpURLConnection conn = (HttpURLConnection) event.getConnectionObject();
+ BlobOperationDescriptor.OperationType operationType
+ = BlobOperationDescriptor.getOperationType(conn);
+ lastOperationTypeReceived = operationType;
+
+ switch (operationType) {
+ case AppendBlock:
+ case PutBlock:
+ case PutPage:
+ lastContentLengthReceived
+ = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+ operationType);
+ break;
+ case GetBlob:
+ lastContentLengthReceived
+ = BlobOperationDescriptor.getContentLengthIfKnown(conn,
+ operationType);
+ break;
+ default:
+ lastContentLengthReceived = 0;
+ break;
+ }
+ }
+
+ /**
+ * Called before the Azure Storage SDK sends a request.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ private void sendingRequest(SendingRequestEvent event) {
+ this.lastOperationTypeSent
+ = BlobOperationDescriptor.getOperationType(
+ (HttpURLConnection) event.getConnectionObject());
+ }
+
+ /**
+ * The ResponseReceivedEvent is fired after the Azure Storage SDK receives a
+ * response.
+ */
+ @InterfaceAudience.Private
+ class ResponseReceivedEventHandler
+ extends StorageEvent<ResponseReceivedEvent> {
+
+ /**
+ * Called after the Azure Storage SDK receives a response.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ @Override
+ public void eventOccurred(ResponseReceivedEvent event) {
+ responseReceived(event);
+ }
+ }
+
+ /**
+ * The SendingRequestEvent is fired before the Azure Storage SDK sends a
+ * request.
+ */
+ @InterfaceAudience.Private
+ class SendingRequestEventHandler extends StorageEvent<SendingRequestEvent> {
+
+ /**
+ * Called before the Azure Storage SDK sends a request.
+ *
+ * @param event The connection, operation, and request state.
+ */
+ @Override
+ public void eventOccurred(SendingRequestEvent event) {
+ sendingRequest(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/778d4edd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..307e5af
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for <code>ClientThrottlingAnalyzer</code>.
+ */
+public class TestClientThrottlingAnalyzer {
+ private static final int ANALYSIS_PERIOD = 1000;
+ private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+ + ANALYSIS_PERIOD / 10;
+ private static final long MEGABYTE = 1024 * 1024;
+ private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
+
+ private void sleep(long milliseconds) {
+ try {
+ Thread.sleep(milliseconds);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void fuzzyValidate(long expected, long actual, double percentage) {
+ final double lowerBound = Math.max(expected - percentage / 100 * expected, 0);
+ final double upperBound = expected + percentage / 100 * expected;
+
+ assertTrue(
+ String.format(
+ "The actual value %1$d is not within the expected range: "
+ + "[%2$.2f, %3$.2f].",
+ actual,
+ lowerBound,
+ upperBound),
+ actual >= lowerBound && actual <= upperBound);
+ }
+
+ private void validate(long expected, long actual) {
+ assertEquals(
+ String.format("The actual value %1$d is not the expected value %2$d.",
+ actual,
+ expected),
+ expected, actual);
+ }
+
+ private void validateLessThanOrEqual(long maxExpected, long actual) {
+ assertTrue(
+ String.format(
+ "The actual value %1$d is not less than or equal to the maximum"
+ + " expected value %2$d.",
+ actual,
+ maxExpected),
+ actual < maxExpected);
+ }
+
+ /**
+ * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+ * never been updated. This validates proper initialization of
+ * ClientThrottlingAnalyzer.
+ */
+ @Test
+ public void testNoMetricUpdatesThenNoWaiting() {
+ ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ validate(0, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+ * only been updated with successful requests.
+ */
+ @Test
+ public void testOnlySuccessThenNoWaiting() {
+ ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ analyzer.addBytesTransferred(8 * MEGABYTE, false);
+ validate(0, analyzer.getSleepDuration());
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ validate(0, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+ * only been updated with failed requests. Also ensure that the
+ * sleepDuration decreases over time.
+ */
+ @Test
+ public void testOnlyErrorsAndWaiting() {
+ ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ analyzer.addBytesTransferred(4 * MEGABYTE, true);
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ final int expectedSleepDuration1 = 1100;
+ validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration());
+ sleep(10 * ANALYSIS_PERIOD);
+ final int expectedSleepDuration2 = 900;
+ validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+ * only been updated with both successful and failed requests. Also ensure
+ * that the sleepDuration decreases over time.
+ */
+ @Test
+ public void testSuccessAndErrorsAndWaiting() {
+ ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ analyzer.addBytesTransferred(8 * MEGABYTE, false);
+ analyzer.addBytesTransferred(2 * MEGABYTE, true);
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ NanoTimer timer = new NanoTimer();
+ analyzer.suspendIfNecessary();
+ final int expectedElapsedTime = 126;
+ fuzzyValidate(expectedElapsedTime,
+ timer.elapsedTimeMs(),
+ MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+ sleep(10 * ANALYSIS_PERIOD);
+ final int expectedSleepDuration = 110;
+ validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+ * only been updated with many successful and failed requests. Also ensure
+ * that the sleepDuration decreases to zero over time.
+ */
+ @Test
+ public void testManySuccessAndErrorsAndWaiting() {
+ ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ final int numberOfRequests = 20;
+ for (int i = 0; i < numberOfRequests; i++) {
+ analyzer.addBytesTransferred(8 * MEGABYTE, false);
+ analyzer.addBytesTransferred(2 * MEGABYTE, true);
+ }
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ NanoTimer timer = new NanoTimer();
+ analyzer.suspendIfNecessary();
+ fuzzyValidate(7,
+ timer.elapsedTimeMs(),
+ MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+ sleep(10 * ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org