You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2018/09/23 03:24:33 UTC
[36/45] hadoop git commit: HADOOP-15703. ABFS - Implement client-side
throttling. Contributed by Sneha Varma and Thomas Marquardt.
HADOOP-15703. ABFS - Implement client-side throttling.
Contributed by Sneha Varma and Thomas Marquardt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/97f06b3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/97f06b3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/97f06b3f
Branch: refs/heads/trunk
Commit: 97f06b3fc70ad509e601076c015bc244daa1243f
Parents: 4410eac
Author: Thomas Marquardt <tm...@microsoft.com>
Authored: Mon Sep 3 01:37:10 2018 +0000
Committer: Thomas Marquardt <tm...@microsoft.com>
Committed: Mon Sep 17 19:54:01 2018 +0000
----------------------------------------------------------------------
.../fs/azure/ClientThrottlingAnalyzer.java | 2 +-
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 +
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 3 +
.../azurebfs/constants/ConfigurationKeys.java | 2 +-
.../constants/FileSystemConfigurations.java | 1 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 17 ++
.../services/AbfsClientThrottlingAnalyzer.java | 272 +++++++++++++++++++
.../services/AbfsClientThrottlingIntercept.java | 117 ++++++++
.../fs/azurebfs/services/AbfsRestOperation.java | 15 +-
.../services/AbfsRestOperationType.java | 42 +++
.../TestAbfsClientThrottlingAnalyzer.java | 159 +++++++++++
11 files changed, 633 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
index 850e552..859a608 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java
@@ -99,7 +99,7 @@ class ClientThrottlingAnalyzer {
this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
new BlobOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
- String.format("wasb-timer-client-throttling-analyzer-%s", name));
+ String.format("wasb-timer-client-throttling-analyzer-%s", name), true);
this.timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 924bc3e..518fef9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -141,6 +141,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
+ DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
+ private boolean enableAutoThrottling;
+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = "")
private String userAgentId;
@@ -279,6 +283,10 @@ public class AbfsConfiguration{
return this.enableFlush;
}
+ public boolean isAutoThrottlingEnabled() {
+ return this.enableAutoThrottling;
+ }
+
public String getCustomUserAgentPrefix() {
return this.userAgentId;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index b809192..c0ecc35 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,6 +131,8 @@ public class AzureBlobFileSystem extends FileSystem {
this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager();
}
}
+
+ AbfsClientThrottlingIntercept.initializeSingleton(abfsStore.getAbfsConfiguration().isAutoThrottlingEnabled());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index ca4c9c3..5236719 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -47,7 +47,7 @@ public final class ConfigurationKeys {
public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
- public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable";
+ public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a921faf..a9412a9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -57,6 +57,7 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
+ public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= SSLSocketFactoryEx.SSLChannelMode.Default;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 18773b6..258045a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -125,6 +125,7 @@ public class AbfsClient {
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.CreateFileSystem,
this,
HTTP_METHOD_PUT,
url,
@@ -148,6 +149,7 @@ public class AbfsClient {
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.SetFileSystemProperties,
this,
HTTP_METHOD_PUT,
url,
@@ -170,6 +172,7 @@ public class AbfsClient {
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.ListPaths,
this,
HTTP_METHOD_GET,
url,
@@ -186,6 +189,7 @@ public class AbfsClient {
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.GetFileSystemProperties,
this,
HTTP_METHOD_HEAD,
url,
@@ -202,6 +206,7 @@ public class AbfsClient {
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.DeleteFileSystem,
this,
HTTP_METHOD_DELETE,
url,
@@ -230,6 +235,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.CreatePath,
this,
HTTP_METHOD_PUT,
url,
@@ -251,6 +257,7 @@ public class AbfsClient {
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
@@ -273,6 +280,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
@@ -296,6 +304,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.Flush,
this,
HTTP_METHOD_PUT,
url,
@@ -319,6 +328,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.SetPathProperties,
this,
HTTP_METHOD_PUT,
url,
@@ -334,6 +344,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.GetPathProperties,
this,
HTTP_METHOD_HEAD,
url,
@@ -354,6 +365,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.ReadFile,
this,
HTTP_METHOD_GET,
url,
@@ -376,6 +388,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.DeletePath,
this,
HTTP_METHOD_DELETE,
url,
@@ -404,6 +417,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.SetOwner,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
@@ -427,6 +441,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.SetPermissions,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
@@ -458,6 +473,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.SetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
@@ -474,6 +490,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.GetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_HEAD,
url,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..f1e5aaa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AbfsClientThrottlingAnalyzer {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsClientThrottlingAnalyzer.class);
+ private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
+ private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
+ private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
+ private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
+ private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
+ private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
+ private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
+ * 1000;
+ private static final double SLEEP_DECREASE_FACTOR = .975;
+ private static final double SLEEP_INCREASE_FACTOR = 1.05;
+ private int analysisPeriodMs;
+
+ private volatile int sleepDuration = 0;
+ private long consecutiveNoErrorCount = 0;
+ private String name = null;
+ private Timer timer = null;
+ private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
+
+ private AbfsClientThrottlingAnalyzer() {
+ // hide default constructor
+ }
+
+ /**
+ * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
+ * the specified name.
+ *
+ * @param name a name used to identify this instance.
+ * @throws IllegalArgumentException if name is null or empty.
+ */
+ AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
+ this(name, DEFAULT_ANALYSIS_PERIOD_MS);
+ }
+
+ /**
+ * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
+ * the specified name and period.
+ *
+ * @param name A name used to identify this instance.
+ * @param period The frequency, in milliseconds, at which metrics are
+ * analyzed.
+ * @throws IllegalArgumentException If name is null or empty.
+ * If period is less than 1000 or greater than 30000 milliseconds.
+ */
+ AbfsClientThrottlingAnalyzer(String name, int period)
+ throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ StringUtils.isNotEmpty(name),
+ "The argument 'name' cannot be null or empty.");
+ Preconditions.checkArgument(
+ period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
+ "The argument 'period' must be between 1000 and 30000.");
+ this.name = name;
+ this.analysisPeriodMs = period;
+ this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
+ new AbfsOperationMetrics(System.currentTimeMillis()));
+ this.timer = new Timer(
+ String.format("abfs-timer-client-throttling-analyzer-%s", name), true);
+ this.timer.schedule(new TimerTaskImpl(),
+ analysisPeriodMs,
+ analysisPeriodMs);
+ }
+
+ /**
+ * Updates metrics with results from the current storage operation.
+ *
+ * @param count The count of bytes transferred.
+ * @param isFailedOperation True if the operation failed; otherwise false.
+ */
+ public void addBytesTransferred(long count, boolean isFailedOperation) {
+ AbfsOperationMetrics metrics = blobMetrics.get();
+ if (isFailedOperation) {
+ metrics.bytesFailed.addAndGet(count);
+ metrics.operationsFailed.incrementAndGet();
+ } else {
+ metrics.bytesSuccessful.addAndGet(count);
+ metrics.operationsSuccessful.incrementAndGet();
+ }
+ }
+
+ /**
+ * Suspends the current storage operation, as necessary, to reduce throughput.
+ */
+ public void suspendIfNecessary() {
+ int duration = sleepDuration;
+ if (duration > 0) {
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ int getSleepDuration() {
+ return sleepDuration;
+ }
+
+ private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
+ int sleepDuration) {
+ final double percentageConversionFactor = 100;
+ double bytesFailed = metrics.bytesFailed.get();
+ double bytesSuccessful = metrics.bytesSuccessful.get();
+ double operationsFailed = metrics.operationsFailed.get();
+ double operationsSuccessful = metrics.operationsSuccessful.get();
+ double errorPercentage = (bytesFailed <= 0)
+ ? 0
+ : (percentageConversionFactor
+ * bytesFailed
+ / (bytesFailed + bytesSuccessful));
+ long periodMs = metrics.endTime - metrics.startTime;
+
+ double newSleepDuration;
+
+ if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
+ ++consecutiveNoErrorCount;
+ // Decrease sleepDuration in order to increase throughput.
+ double reductionFactor =
+ (consecutiveNoErrorCount * analysisPeriodMs
+ >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
+ ? RAPID_SLEEP_DECREASE_FACTOR
+ : SLEEP_DECREASE_FACTOR;
+
+ newSleepDuration = sleepDuration * reductionFactor;
+ } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
+ // Do not modify sleepDuration in order to stabilize throughput.
+ newSleepDuration = sleepDuration;
+ } else {
+ // Increase sleepDuration in order to minimize error rate.
+ consecutiveNoErrorCount = 0;
+
+ // Increase sleep duration in order to reduce throughput and error rate.
+ // First, calculate target throughput: bytesSuccessful / periodMs.
+ // Next, calculate time required to send *all* data (assuming next period
+ // is similar to previous) at the target throughput: (bytesSuccessful
+ // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
+ // get the total additional delay needed.
+ double additionalDelayNeeded = 5 * analysisPeriodMs;
+ if (bytesSuccessful > 0) {
+ additionalDelayNeeded = (bytesSuccessful + bytesFailed)
+ * periodMs
+ / bytesSuccessful
+ - periodMs;
+ }
+
+ // amortize the additional delay needed across the estimated number of
+ // requests during the next period
+ newSleepDuration = additionalDelayNeeded
+ / (operationsFailed + operationsSuccessful);
+
+ final double maxSleepDuration = analysisPeriodMs;
+ final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
+
+ // Add 1 ms to avoid rounding down and to decrease proximity to the server
+ // side ingress/egress limit. Ensure that the new sleep duration is
+ // larger than the current one to more quickly reduce the number of
+ // errors. Don't allow the sleep duration to grow unbounded, after a
+ // certain point throttling won't help, for example, if there are far too
+ // many tasks/containers/nodes no amount of throttling will help.
+ newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
+ newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
+ name,
+ (int) bytesFailed,
+ (int) bytesSuccessful,
+ (int) operationsFailed,
+ (int) operationsSuccessful,
+ errorPercentage,
+ periodMs,
+ (int) sleepDuration,
+ (int) newSleepDuration));
+ }
+
+ return (int) newSleepDuration;
+ }
+
+ /**
+ * Timer callback implementation for periodically analyzing metrics.
+ */
+ class TimerTaskImpl extends TimerTask {
+ private AtomicInteger doingWork = new AtomicInteger(0);
+
+ /**
+ * Periodically analyzes a snapshot of the blob storage metrics and updates
+ * the sleepDuration in order to appropriately throttle storage operations.
+ */
+ @Override
+ public void run() {
+ boolean doWork = false;
+ try {
+ doWork = doingWork.compareAndSet(0, 1);
+
+ // prevent concurrent execution of this task
+ if (!doWork) {
+ return;
+ }
+
+ long now = System.currentTimeMillis();
+ if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+ AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
+ new AbfsOperationMetrics(now));
+ oldMetrics.endTime = now;
+ sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
+ sleepDuration);
+ }
+ } finally {
+ if (doWork) {
+ doingWork.set(0);
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores Abfs operation metrics during each analysis period.
+ */
+ static class AbfsOperationMetrics {
+ private AtomicLong bytesFailed;
+ private AtomicLong bytesSuccessful;
+ private AtomicLong operationsFailed;
+ private AtomicLong operationsSuccessful;
+ private long endTime;
+ private long startTime;
+
+ AbfsOperationMetrics(long startTime) {
+ this.startTime = startTime;
+ this.bytesFailed = new AtomicLong();
+ this.bytesSuccessful = new AtomicLong();
+ this.operationsFailed = new AtomicLong();
+ this.operationsSuccessful = new AtomicLong();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
new file mode 100644
index 0000000..e981d76
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.HttpURLConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Throttles Azure Blob File System read and write operations to achieve maximum
+ * throughput by minimizing errors. The errors occur when the account ingress
+ * or egress limits are exceeded and the server-side throttles requests.
+ * Server-side throttling causes the retry policy to be used, but the retry
+ * policy sleeps for long periods of time causing the total ingress or egress
+ * throughput to be as much as 35% lower than optimal. The retry policy is also
+ * after the fact, in that it applies after a request fails. On the other hand,
+ * the client-side throttling implemented here happens before requests are made
+ * and sleeps just enough to minimize errors, allowing optimal ingress and/or
+ * egress throughput.
+ */
+public final class AbfsClientThrottlingIntercept {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsClientThrottlingIntercept.class);
+ private static AbfsClientThrottlingIntercept singleton = null;
+ private AbfsClientThrottlingAnalyzer readThrottler = null;
+ private AbfsClientThrottlingAnalyzer writeThrottler = null;
+ private static boolean isAutoThrottlingEnabled = false;
+
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept() {
+ readThrottler = new AbfsClientThrottlingAnalyzer("read");
+ writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ isAutoThrottlingEnabled = true;
+ LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ }
+
+ public static synchronized void initializeSingleton(boolean isAutoThrottlingEnabled) {
+ if (!isAutoThrottlingEnabled) {
+ return;
+ }
+ if (singleton == null) {
+ singleton = new AbfsClientThrottlingIntercept();
+ }
+ }
+
+ static void updateMetrics(AbfsRestOperationType operationType,
+ AbfsHttpOperation abfsHttpOperation) {
+ if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+ return;
+ }
+
+ int status = abfsHttpOperation.getStatusCode();
+ long contentLength = 0;
+ // If the socket is terminated prior to receiving a response, the HTTP
+ // status may be 0 or -1. A status less than 200 or greater than or equal
+ // to 500 is considered an error.
+ boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
+ || status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
+
+ switch (operationType) {
+ case Append:
+ contentLength = abfsHttpOperation.getBytesSent();
+ if (contentLength > 0) {
+ singleton.writeThrottler.addBytesTransferred(contentLength,
+ isFailedOperation);
+ }
+ break;
+ case ReadFile:
+ contentLength = abfsHttpOperation.getBytesReceived();
+ if (contentLength > 0) {
+ singleton.readThrottler.addBytesTransferred(contentLength,
+ isFailedOperation);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Called before the request is sent. Client-side throttling
+ * uses this to suspend the request, if necessary, to minimize errors and
+ * maximize throughput.
+ */
+ static void sendingRequest(AbfsRestOperationType operationType) {
+ if (!isAutoThrottlingEnabled) {
+ return;
+ }
+
+ switch (operationType) {
+ case ReadFile:
+ singleton.readThrottler.suspendIfNecessary();
+ break;
+ case Append:
+ singleton.writeThrottler.suspendIfNecessary();
+ break;
+ default:
+ break;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index c0407f5..9a71879 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
* The AbfsRestOperation for Rest AbfsClient.
*/
public class AbfsRestOperation {
+ // The type of the REST operation (Append, ReadFile, etc)
+ private final AbfsRestOperationType operationType;
// Blob FS client, which has the credentials, retry policy, and logs.
private final AbfsClient client;
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
@@ -71,10 +73,12 @@ public class AbfsRestOperation {
* @param url The full URL including query string parameters.
* @param requestHeaders The HTTP request headers.
*/
- AbfsRestOperation(final AbfsClient client,
+ AbfsRestOperation(final AbfsRestOperationType operationType,
+ final AbfsClient client,
final String method,
final URL url,
final List<AbfsHttpHeader> requestHeaders) {
+ this.operationType = operationType;
this.client = client;
this.method = method;
this.url = url;
@@ -86,6 +90,7 @@ public class AbfsRestOperation {
/**
* Initializes a new REST operation.
*
+ * @param operationType The type of the REST operation (Append, ReadFile, etc).
* @param client The Blob FS client.
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
* @param url The full URL including query string parameters.
@@ -95,14 +100,15 @@ public class AbfsRestOperation {
* @param bufferOffset An offset into the buffer where the data beings.
* @param bufferLength The length of the data in the buffer.
*/
- AbfsRestOperation(AbfsClient client,
+ AbfsRestOperation(AbfsRestOperationType operationType,
+ AbfsClient client,
String method,
URL url,
List<AbfsHttpHeader> requestHeaders,
byte[] buffer,
int bufferOffset,
int bufferLength) {
- this(client, method, url, requestHeaders);
+ this(operationType, client, method, url, requestHeaders);
this.buffer = buffer;
this.bufferOffset = bufferOffset;
this.bufferLength = bufferLength;
@@ -152,6 +158,7 @@ public class AbfsRestOperation {
if (hasRequestBody) {
// HttpUrlConnection requires
+ AbfsClientThrottlingIntercept.sendingRequest(operationType);
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
}
@@ -168,6 +175,8 @@ public class AbfsRestOperation {
throw new InvalidAbfsRestOperationException(ex);
}
return false;
+ } finally {
+ AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
}
LOG.debug("HttpRequest: " + httpOperation.toString());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
new file mode 100644
index 0000000..eeea817
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * The REST operation type (Read, Append, Other ).
+ */
+public enum AbfsRestOperationType {
+ CreateFileSystem,
+ GetFileSystemProperties,
+ SetFileSystemProperties,
+ ListPaths,
+ DeleteFileSystem,
+ CreatePath,
+ RenamePath,
+ GetAcl,
+ GetPathProperties,
+ SetAcl,
+ SetOwner,
+ SetPathProperties,
+ SetPermissions,
+ Append,
+ Flush,
+ ReadFile,
+ DeletePath
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97f06b3f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
new file mode 100644
index 0000000..5105b85
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java
@@ -0,0 +1,159 @@
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for <code>AbfsClientThrottlingAnalyzer</code>.
+ */
+public class TestAbfsClientThrottlingAnalyzer {
+ private static final int ANALYSIS_PERIOD = 1000;
+ private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+ + ANALYSIS_PERIOD / 10;
+ private static final long MEGABYTE = 1024 * 1024;
+ private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
+
+ private void sleep(long milliseconds) {
+ try {
+ Thread.sleep(milliseconds);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void fuzzyValidate(long expected, long actual, double percentage) {
+ final double lowerBound = Math.max(expected - percentage / 100 * expected, 0);
+ final double upperBound = expected + percentage / 100 * expected;
+
+ assertTrue(
+ String.format(
+ "The actual value %1$d is not within the expected range: "
+ + "[%2$.2f, %3$.2f].",
+ actual,
+ lowerBound,
+ upperBound),
+ actual >= lowerBound && actual <= upperBound);
+ }
+
+ private void validate(long expected, long actual) {
+ assertEquals(
+ String.format("The actual value %1$d is not the expected value %2$d.",
+ actual,
+ expected),
+ expected, actual);
+ }
+
+ private void validateLessThanOrEqual(long maxExpected, long actual) {
+ assertTrue(
+ String.format(
+ "The actual value %1$d is not less than or equal to the maximum"
+ + " expected value %2$d.",
+ actual,
+ maxExpected),
+ actual < maxExpected);
+ }
+
+ /**
+ * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+ * never been updated. This validates proper initialization of
+ * ClientThrottlingAnalyzer.
+ */
+ @Test
+ public void testNoMetricUpdatesThenNoWaiting() {
+ AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ validate(0, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is no waiting (sleepDuration = 0) if the metrics have
+ * only been updated with successful requests.
+ */
+ @Test
+ public void testOnlySuccessThenNoWaiting() {
+ AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ analyzer.addBytesTransferred(8 * MEGABYTE, false);
+ validate(0, analyzer.getSleepDuration());
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ validate(0, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+ * only been updated with failed requests. Also ensure that the
+ * sleepDuration decreases over time.
+ */
+ @Test
+ public void testOnlyErrorsAndWaiting() {
+ AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ analyzer.addBytesTransferred(4 * MEGABYTE, true);
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ final int expectedSleepDuration1 = 1100;
+ validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration());
+ sleep(10 * ANALYSIS_PERIOD);
+ final int expectedSleepDuration2 = 900;
+ validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+ * only been updated with both successful and failed requests. Also ensure
+ * that the sleepDuration decreases over time.
+ */
+ @Test
+ public void testSuccessAndErrorsAndWaiting() {
+ AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ analyzer.addBytesTransferred(8 * MEGABYTE, false);
+ analyzer.addBytesTransferred(2 * MEGABYTE, true);
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ analyzer.suspendIfNecessary();
+ final int expectedElapsedTime = 126;
+ fuzzyValidate(expectedElapsedTime,
+ timer.elapsedTimeMs(),
+ MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+ sleep(10 * ANALYSIS_PERIOD);
+ final int expectedSleepDuration = 110;
+ validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration());
+ }
+
+ /**
+ * Ensure that there is waiting (sleepDuration != 0) if the metrics have
+ * only been updated with many successful and failed requests. Also ensure
+ * that the sleepDuration decreases to zero over time.
+ */
+ @Test
+ public void testManySuccessAndErrorsAndWaiting() {
+ AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
+ "test",
+ ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ final int numberOfRequests = 20;
+ for (int i = 0; i < numberOfRequests; i++) {
+ analyzer.addBytesTransferred(8 * MEGABYTE, false);
+ analyzer.addBytesTransferred(2 * MEGABYTE, true);
+ }
+ sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ analyzer.suspendIfNecessary();
+ fuzzyValidate(7,
+ timer.elapsedTimeMs(),
+ MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
+ sleep(10 * ANALYSIS_PERIOD);
+ validate(0, analyzer.getSleepDuration());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org