You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/04/29 12:27:58 UTC
[hadoop] branch trunk updated: HADOOP-16242. ABFS: add bufferpool
to AbfsOutputStream.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1cef194 HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.
1cef194 is described below
commit 1cef194a28086991cd39fb62092d2b2105ece57b
Author: Da Zhou <da...@microsoft.com>
AuthorDate: Mon Apr 29 13:27:28 2019 +0100
HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.
Contributed by Da Zhou.
---
.../hadoop/fs/azurebfs/services/AbfsOutputStream.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 56fe0b1..f2f0a45 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
+import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
@@ -64,6 +66,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
+ /**
+ * Queue storing buffers with the size of the Azure block ready for
+ * reuse. The pool allows reusing the blocks instead of allocating new
+ * blocks. After the data is sent to the service, the buffer is returned
+ * back to the queue
+ */
+ private final ElasticByteBufferPool byteBufferPool
+ = new ElasticByteBufferPool();
+
public AbfsOutputStream(
final AbfsClient client,
final String path,
@@ -78,7 +89,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
+ this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
@@ -263,8 +274,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
-
- buffer = new byte[bufferSize];
+ buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
position += bytesLength;
@@ -278,6 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
public Void call() throws Exception {
client.append(path, offset, bytes, 0,
bytesLength);
+ byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
return null;
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org