You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2020/10/14 22:39:32 UTC

[hadoop] 06/09: HADOOP-17166. ABFS: configure output stream thread pool (#2179)

This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f208da286cddebe594c240ed6e4c8c4850f1faeb
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Wed Sep 9 21:11:36 2020 +0530

    HADOOP-17166. ABFS: configure output stream thread pool (#2179)
    
    
    Adds the options to control the size of the per-output-stream threadpool
    when writing data through the abfs connector
    
    * fs.azure.write.max.concurrent.requests
    * fs.azure.write.max.requests.to.queue
    
    Contributed by Bilahari T H
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      | 22 ++++++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  2 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  2 +
 .../fs/azurebfs/services/AbfsOutputStream.java     | 18 ++++-
 .../azurebfs/services/AbfsOutputStreamContext.java | 24 +++++++
 .../hadoop-azure/src/site/markdown/abfs.md         | 13 ++++
 .../azurebfs/services/ITestAbfsOutputStream.java   | 78 ++++++++++++++++++++++
 .../fs/azurebfs/services/TestAbfsOutputStream.java |  7 +-
 8 files changed, 163 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 85bd37a..66d4853 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -86,6 +86,14 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
   private String isNamespaceEnabledAccount;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
+      DefaultValue = -1)
+  private int writeMaxConcurrentRequestCount;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
+      DefaultValue = -1)
+  private int maxWriteRequestsToQueue;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
       MinValue = MIN_BUFFER_SIZE,
       MaxValue = MAX_BUFFER_SIZE,
@@ -822,6 +830,20 @@ public class AbfsConfiguration{
         oauthTokenFetchRetryDeltaBackoff);
   }
 
+  public int getWriteMaxConcurrentRequestCount() {
+    if (this.writeMaxConcurrentRequestCount < 1) {
+      return 4 * Runtime.getRuntime().availableProcessors();
+    }
+    return this.writeMaxConcurrentRequestCount;
+  }
+
+  public int getMaxWriteRequestsToQueue() {
+    if (this.maxWriteRequestsToQueue < 1) {
+      return 2 * getWriteMaxConcurrentRequestCount();
+    }
+    return this.maxWriteRequestsToQueue;
+  }
+
   @VisibleForTesting
   void setReadBufferSize(int bufferSize) {
     this.readBufferSize = bufferSize;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 9861e3a..23d2b5a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -490,6 +490,8 @@ public class AzureBlobFileSystemStore implements Closeable {
             .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
+            .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
+            .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .build();
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 5f1ad31..681390c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -52,6 +52,8 @@ public final class ConfigurationKeys {
   public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
 
   // Read and write buffer sizes defined by the user
+  public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
+  public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
   public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
   public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 6c1e177..1991638 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -70,6 +70,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private byte[] buffer;
   private int bufferIndex;
   private final int maxConcurrentRequestCount;
+  private final int maxRequestsThatCanBeQueued;
 
   private ConcurrentLinkedDeque<WriteOperation> writeOperations;
   private final ThreadPoolExecutor threadExecutor;
@@ -119,8 +120,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     if (this.isAppendBlob) {
       this.maxConcurrentRequestCount = 1;
     } else {
-      this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+      this.maxConcurrentRequestCount = abfsOutputStreamContext
+          .getWriteMaxConcurrentRequestCount();
     }
+    this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
+        .getMaxWriteRequestsToQueue();
     this.threadExecutor
         = new ThreadPoolExecutor(maxConcurrentRequestCount,
         maxConcurrentRequestCount,
@@ -371,7 +375,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     final long offset = position;
     position += bytesLength;
 
-    if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+    if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
       long start = System.currentTimeMillis();
       waitForTaskToComplete();
       outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
@@ -543,6 +547,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     return writeOperations.size();
   }
 
+  @VisibleForTesting
+  int getMaxConcurrentRequestCount() {
+    return this.maxConcurrentRequestCount;
+  }
+
+  @VisibleForTesting
+  int getMaxRequestsThatCanBeQueued() {
+    return maxRequestsThatCanBeQueued;
+  }
+
   /**
    * Appending AbfsOutputStream statistics to base toString().
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 03e4aba..2dce5dc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -33,6 +33,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private boolean isAppendBlob;
 
+  private int writeMaxConcurrentRequestCount;
+
+  private int maxWriteRequestsToQueue;
+
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -71,6 +75,18 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
+      final int writeMaxConcurrentRequestCount) {
+    this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
+    return this;
+  }
+
+  public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
+      final int maxWriteRequestsToQueue) {
+    this.maxWriteRequestsToQueue = maxWriteRequestsToQueue;
+    return this;
+  }
+
   public int getWriteBufferSize() {
     return writeBufferSize;
   }
@@ -90,4 +106,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public boolean isAppendBlob() {
     return isAppendBlob;
   }
+
+  public int getWriteMaxConcurrentRequestCount() {
+    return this.writeMaxConcurrentRequestCount;
+  }
+
+  public int getMaxWriteRequestsToQueue() {
+    return this.maxWriteRequestsToQueue;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 4640bab..79b897b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -796,6 +796,19 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
  doing only random reads (non-sequential) or you are seeing throttling, you
   may try setting this value to 0.
 
+To run under limited memory situations configure the following. Especially
+when there are too many writes from the same process. 
+
+`fs.azure.write.max.concurrent.requests`: To set the maximum concurrent
+ write requests from an AbfsOutputStream instance  to server at any point of
+ time. Effectively this will be the threadpool size within the
+ AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive.
+
+`fs.azure.write.max.requests.to.queue`: To set the maximum write requests
+ that can be queued. Memory consumption of AbfsOutputStream instance can be
+ tuned with this config considering each queued request holds a buffer. Set
+ the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
+
 ### <a name="securityconfigoptions"></a> Security Options
 `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
 is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
new file mode 100644
index 0000000..7f91116
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+/**
+ * Test create operation.
+ */
+public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
+  private static final Path TEST_FILE_PATH = new Path("testfile");
+
+  public ITestAbfsOutputStream() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
+    Configuration conf = getRawConfiguration();
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
+    AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+    Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
+        "maxConcurrentRequests should be " + getConfiguration()
+            .getWriteMaxConcurrentRequestCount())
+        .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
+    Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
+        "maxRequestsToQueue should be " + getConfiguration()
+            .getMaxWriteRequestsToQueue())
+        .isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
+    }
+  }
+
+  @Test
+  public void testMaxRequestsAndQueueCapacity() throws Exception {
+    Configuration conf = getRawConfiguration();
+    int maxConcurrentRequests = 6;
+    int maxRequestsToQueue = 10;
+    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
+        "" + maxConcurrentRequests);
+    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
+        "" + maxRequestsToQueue);
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+    AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+    Assertions.assertThat(stream.getMaxConcurrentRequestCount())
+        .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
+        .isEqualTo(maxConcurrentRequests);
+    Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
+        .describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
+        .isEqualTo(maxRequestsToQueue);
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 4105aa1..aab0248 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Random;
@@ -54,13 +55,17 @@ public final class TestAbfsOutputStream {
   private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
             boolean isFlushEnabled,
             boolean disableOutputStreamFlush,
-            boolean isAppendBlob) {
+            boolean isAppendBlob) throws IOException, IllegalAccessException {
+    AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
+        accountName1);
     return new AbfsOutputStreamContext(2)
             .withWriteBufferSize(writeBufferSize)
             .enableFlush(isFlushEnabled)
             .disableOutputStreamFlush(disableOutputStreamFlush)
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
+            .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
+            .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
             .build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org