You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2019/08/28 22:44:18 UTC

[hadoop] 01/09: HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.

This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e5bb8498df2b22495ae58829e7125d1377cc765a
Author: Da Zhou <da...@microsoft.com>
AuthorDate: Mon Apr 29 13:27:28 2019 +0100

    HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.
    
    Contributed by Da Zhou.
---
 .../hadoop/fs/azurebfs/services/AbfsOutputStream.java   | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 5764bcb..679f22e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.nio.ByteBuffer;
 import java.util.Locale;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
@@ -64,6 +66,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final ThreadPoolExecutor threadExecutor;
   private final ExecutorCompletionService<Void> completionService;
 
+  /**
+   * Queue storing buffers with the size of the Azure block ready for
+   * reuse. The pool allows reusing the blocks instead of allocating new
+   * blocks. After the data is sent to the service, the buffer is returned
+   * back to the queue
+   */
+  private final ElasticByteBufferPool byteBufferPool
+          = new ElasticByteBufferPool();
+
   public AbfsOutputStream(
       final AbfsClient client,
       final String path,
@@ -78,7 +89,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     this.lastError = null;
     this.lastFlushOffset = 0;
     this.bufferSize = bufferSize;
-    this.buffer = new byte[bufferSize];
+    this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();
 
@@ -268,8 +279,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
 
     final byte[] bytes = buffer;
     final int bytesLength = bufferIndex;
-
-    buffer = new byte[bufferSize];
+    buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     bufferIndex = 0;
     final long offset = position;
     position += bytesLength;
@@ -283,6 +293,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       public Void call() throws Exception {
         client.append(path, offset, bytes, 0,
             bytesLength);
+        byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
         return null;
       }
     });


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org