You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2020/10/14 22:39:32 UTC
[hadoop] 06/09: HADOOP-17166. ABFS: configure output stream thread
pool (#2179)
This is an automated email from the ASF dual-hosted git repository.
tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit f208da286cddebe594c240ed6e4c8c4850f1faeb
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Wed Sep 9 21:11:36 2020 +0530
HADOOP-17166. ABFS: configure output stream thread pool (#2179)
Adds the options to control the size of the per-output-stream threadpool
when writing data through the abfs connector
* fs.azure.write.max.concurrent.requests
* fs.azure.write.max.requests.to.queue
Contributed by Bilahari T H
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 22 ++++++
.../fs/azurebfs/AzureBlobFileSystemStore.java | 2 +
.../fs/azurebfs/constants/ConfigurationKeys.java | 2 +
.../fs/azurebfs/services/AbfsOutputStream.java | 18 ++++-
.../azurebfs/services/AbfsOutputStreamContext.java | 24 +++++++
.../hadoop-azure/src/site/markdown/abfs.md | 13 ++++
.../azurebfs/services/ITestAbfsOutputStream.java | 78 ++++++++++++++++++++++
.../fs/azurebfs/services/TestAbfsOutputStream.java | 7 +-
8 files changed, 163 insertions(+), 3 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 85bd37a..66d4853 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -86,6 +86,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
private String isNamespaceEnabledAccount;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
+ DefaultValue = -1)
+ private int writeMaxConcurrentRequestCount;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
+ DefaultValue = -1)
+ private int maxWriteRequestsToQueue;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
@@ -822,6 +830,20 @@ public class AbfsConfiguration{
oauthTokenFetchRetryDeltaBackoff);
}
+ public int getWriteMaxConcurrentRequestCount() {
+ if (this.writeMaxConcurrentRequestCount < 1) {
+ return 4 * Runtime.getRuntime().availableProcessors();
+ }
+ return this.writeMaxConcurrentRequestCount;
+ }
+
+ public int getMaxWriteRequestsToQueue() {
+ if (this.maxWriteRequestsToQueue < 1) {
+ return 2 * getWriteMaxConcurrentRequestCount();
+ }
+ return this.maxWriteRequestsToQueue;
+ }
+
@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 9861e3a..23d2b5a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -490,6 +490,8 @@ public class AzureBlobFileSystemStore implements Closeable {
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
+ .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
+ .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.build();
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 5f1ad31..681390c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -52,6 +52,8 @@ public final class ConfigurationKeys {
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
// Read and write buffer sizes defined by the user
+ public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
+ public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 6c1e177..1991638 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -70,6 +70,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private byte[] buffer;
private int bufferIndex;
private final int maxConcurrentRequestCount;
+ private final int maxRequestsThatCanBeQueued;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final ThreadPoolExecutor threadExecutor;
@@ -119,8 +120,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1;
} else {
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = abfsOutputStreamContext
+ .getWriteMaxConcurrentRequestCount();
}
+ this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
+ .getMaxWriteRequestsToQueue();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
@@ -371,7 +375,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final long offset = position;
position += bytesLength;
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
long start = System.currentTimeMillis();
waitForTaskToComplete();
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
@@ -543,6 +547,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
return writeOperations.size();
}
+ @VisibleForTesting
+ int getMaxConcurrentRequestCount() {
+ return this.maxConcurrentRequestCount;
+ }
+
+ @VisibleForTesting
+ int getMaxRequestsThatCanBeQueued() {
+ return maxRequestsThatCanBeQueued;
+ }
+
/**
* Appending AbfsOutputStream statistics to base toString().
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 03e4aba..2dce5dc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -33,6 +33,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private boolean isAppendBlob;
+ private int writeMaxConcurrentRequestCount;
+
+ private int maxWriteRequestsToQueue;
+
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -71,6 +75,18 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
return this;
}
+ public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
+ final int writeMaxConcurrentRequestCount) {
+ this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
+ final int maxWriteRequestsToQueue) {
+ this.maxWriteRequestsToQueue = maxWriteRequestsToQueue;
+ return this;
+ }
+
public int getWriteBufferSize() {
return writeBufferSize;
}
@@ -90,4 +106,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
public boolean isAppendBlob() {
return isAppendBlob;
}
+
+ public int getWriteMaxConcurrentRequestCount() {
+ return this.writeMaxConcurrentRequestCount;
+ }
+
+ public int getMaxWriteRequestsToQueue() {
+ return this.maxWriteRequestsToQueue;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 4640bab..79b897b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -796,6 +796,19 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
doing only random reads (non-sequential) or you are seeing throttling, you
may try setting this value to 0.
+To run under limited memory situations configure the following. Especially
+when there are too many writes from the same process.
+
+`fs.azure.write.max.concurrent.requests`: To set the maximum concurrent
+ write requests from an AbfsOutputStream instance to server at any point of
+ time. Effectively this will be the threadpool size within the
+ AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive.
+
+`fs.azure.write.max.requests.to.queue`: To set the maximum write requests
+ that can be queued. Memory consumption of AbfsOutputStream instance can be
+ tuned with this config considering each queued request holds a buffer. Set
+ the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
+
### <a name="securityconfigoptions"></a> Security Options
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
new file mode 100644
index 0000000..7f91116
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+/**
+ * Test create operation.
+ */
+public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
+ private static final Path TEST_FILE_PATH = new Path("testfile");
+
+ public ITestAbfsOutputStream() throws Exception {
+ super();
+ }
+
+ @Test
+ public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
+ Configuration conf = getRawConfiguration();
+ final AzureBlobFileSystem fs = getFileSystem(conf);
+ try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
+ AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+ Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
+ "maxConcurrentRequests should be " + getConfiguration()
+ .getWriteMaxConcurrentRequestCount())
+ .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
+ Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
+ "maxRequestsToQueue should be " + getConfiguration()
+ .getMaxWriteRequestsToQueue())
+ .isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
+ }
+ }
+
+ @Test
+ public void testMaxRequestsAndQueueCapacity() throws Exception {
+ Configuration conf = getRawConfiguration();
+ int maxConcurrentRequests = 6;
+ int maxRequestsToQueue = 10;
+ conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
+ "" + maxConcurrentRequests);
+ conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
+ "" + maxRequestsToQueue);
+ final AzureBlobFileSystem fs = getFileSystem(conf);
+ FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+ AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+ Assertions.assertThat(stream.getMaxConcurrentRequestCount())
+ .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
+ .isEqualTo(maxConcurrentRequests);
+ Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
+ .describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
+ .isEqualTo(maxRequestsToQueue);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 4105aa1..aab0248 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
@@ -54,13 +55,17 @@ public final class TestAbfsOutputStream {
private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
boolean isFlushEnabled,
boolean disableOutputStreamFlush,
- boolean isAppendBlob) {
+ boolean isAppendBlob) throws IOException, IllegalAccessException {
+ AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
+ accountName1);
return new AbfsOutputStreamContext(2)
.withWriteBufferSize(writeBufferSize)
.enableFlush(isFlushEnabled)
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
+ .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
+ .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
.build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org