You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/11 17:46:47 UTC

[hadoop] branch trunk updated: HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4aa24d  HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee.
b4aa24d is described below

commit b4aa24d3c5ad1b9309a58795e4b48e567695c4e4
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Mar 11 23:15:49 2019 +0530

    HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee.
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 232 +++++++++------------
 .../apache/hadoop/hdds/scm/storage/BufferPool.java | 106 ++++++++++
 .../ozone/client/io/BlockOutputStreamEntry.java    |  23 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  21 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |  62 ++++--
 .../commandhandler/TestBlockDeletion.java          |   2 +-
 6 files changed, 279 insertions(+), 167 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 2e156b3..fe41f57 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.UUID;
@@ -87,7 +86,7 @@ public class BlockOutputStream extends OutputStream {
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
-  private List<ByteBuffer> bufferList;
+  private BufferPool bufferPool;
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
@@ -111,8 +110,6 @@ public class BlockOutputStream extends OutputStream {
   // map containing mapping for putBlock logIndex to to flushedDataLength Map.
   private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
 
-  private int currentBufferIndex;
-
   private List<DatanodeDetails> failedServers;
 
   /**
@@ -124,7 +121,7 @@ public class BlockOutputStream extends OutputStream {
    * @param pipeline             pipeline where block will be written
    * @param traceID              container protocol call args
    * @param chunkSize            chunk size
-   * @param bufferList           list of byte buffers
+   * @param bufferPool           pool of buffers
    * @param streamBufferFlushSize flush size
    * @param streamBufferMaxSize   max size of the currentBuffer
    * @param watchTimeout          watch timeout
@@ -135,7 +132,7 @@ public class BlockOutputStream extends OutputStream {
   public BlockOutputStream(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       String traceID, int chunkSize, long streamBufferFlushSize,
-      long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
+      long streamBufferMaxSize, long watchTimeout, BufferPool bufferPool,
       ChecksumType checksumType, int bytesPerChecksum)
       throws IOException {
     this.blockID = blockID;
@@ -154,7 +151,7 @@ public class BlockOutputStream extends OutputStream {
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
-    this.bufferList = bufferList;
+    this.bufferPool = bufferPool;
     this.checksumType = checksumType;
     this.bytesPerChecksum = bytesPerChecksum;
 
@@ -164,7 +161,6 @@ public class BlockOutputStream extends OutputStream {
     totalAckDataLength = 0;
     futureMap = new ConcurrentHashMap<>();
     totalDataFlushedLength = 0;
-    currentBufferIndex = 0;
     writtenDataLength = 0;
     failedServers = Collections.emptyList();
   }
@@ -181,13 +177,6 @@ public class BlockOutputStream extends OutputStream {
     return writtenDataLength;
   }
 
-  private long computeBufferData() {
-    int dataLength =
-        bufferList.stream().mapToInt(Buffer::position).sum();
-    Preconditions.checkState(dataLength <= streamBufferMaxSize);
-    return dataLength;
-  }
-
   public List<DatanodeDetails> getFailedServers() {
     return failedServers;
   }
@@ -202,6 +191,7 @@ public class BlockOutputStream extends OutputStream {
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
+    checkOpen();
     if (b == null) {
       throw new NullPointerException();
     }
@@ -213,53 +203,40 @@ public class BlockOutputStream extends OutputStream {
       return;
     }
     while (len > 0) {
-      checkOpen();
       int writeLen;
-      allocateBuffer();
-      ByteBuffer currentBuffer = getCurrentBuffer();
+
+      // Allocate a buffer if needed. The buffer will be allocated only
+      // once as needed and will be reused again for mutiple blockOutputStream
+      // entries.
+      ByteBuffer  currentBuffer = bufferPool.allocateBufferIfNeeded();
+      int pos = currentBuffer.position();
       writeLen =
-          Math.min(chunkSize - currentBuffer.position() % chunkSize, len);
+          Math.min(chunkSize - pos % chunkSize, len);
       currentBuffer.put(b, off, writeLen);
-      if (currentBuffer.position() % chunkSize == 0) {
-        int pos = currentBuffer.position() - chunkSize;
-        int limit = currentBuffer.position();
-        writeChunk(pos, limit);
+      if (!currentBuffer.hasRemaining()) {
+        writeChunk(currentBuffer);
       }
       off += writeLen;
       len -= writeLen;
       writtenDataLength += writeLen;
-      if (currentBuffer.position() == streamBufferFlushSize) {
+      if (shouldFlush()) {
         totalDataFlushedLength += streamBufferFlushSize;
         handlePartialFlush();
       }
-      long bufferedData = computeBufferData();
-      // Data in the bufferList can not exceed streamBufferMaxSize
-      if (bufferedData == streamBufferMaxSize) {
+      // Data in the bufferPool can not exceed streamBufferMaxSize
+      if (isBufferPoolFull()) {
         handleFullBuffer();
       }
     }
   }
 
-  private ByteBuffer getCurrentBuffer() {
-    ByteBuffer buffer = bufferList.get(currentBufferIndex);
-    if (!buffer.hasRemaining()) {
-      currentBufferIndex =
-          currentBufferIndex < getMaxNumBuffers() - 1 ? ++currentBufferIndex :
-              0;
-    }
-    return bufferList.get(currentBufferIndex);
-  }
-
-  private int getMaxNumBuffers() {
-    return (int)(streamBufferMaxSize/streamBufferFlushSize);
+  private boolean shouldFlush() {
+    return writtenDataLength % streamBufferFlushSize == 0;
   }
 
-  private void allocateBuffer() {
-    for (int i = bufferList.size(); i < getMaxNumBuffers(); i++) {
-      bufferList.add(ByteBuffer.allocate((int)streamBufferFlushSize));
-    }
+  private boolean isBufferPoolFull() {
+    return bufferPool.computeBufferData() == streamBufferMaxSize;
   }
-
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
@@ -272,36 +249,37 @@ public class BlockOutputStream extends OutputStream {
     if (len == 0) {
       return;
     }
-    int off = 0;
-    int pos = off;
+    int count = 0;
+    Preconditions.checkArgument(len <= streamBufferMaxSize);
     while (len > 0) {
       long writeLen;
       writeLen = Math.min(chunkSize, len);
       if (writeLen == chunkSize) {
-        int limit = pos + chunkSize;
-        writeChunk(pos, limit);
+        writeChunk(bufferPool.getBuffer(count));
       }
-      off += writeLen;
       len -= writeLen;
+      count++;
       writtenDataLength += writeLen;
-      if (off % streamBufferFlushSize == 0) {
-        // reset the position to zero as now we wll readng thhe next buffer in
-        // the list
-        pos = 0;
+      if (shouldFlush()) {
+        // reset the position to zero as now we will be reading the
+        // next buffer in the list
         totalDataFlushedLength += streamBufferFlushSize;
         handlePartialFlush();
       }
-      if (computeBufferData() % streamBufferMaxSize == 0) {
+
+      // we should not call isBufferFull here. The buffer might already be full
+      // as whole data is already cached in the buffer. We should just validate
+      // if we wrote data of size streamBufferMaxSize to call for handling
+      // full buffer condition.
+      if (writtenDataLength == streamBufferMaxSize) {
         handleFullBuffer();
       }
     }
   }
 
   /**
-   * just update the totalAckDataLength. Since we have allocated
-   * the currentBuffer more than the streamBufferMaxSize, we can keep on writing
-   * to the currentBuffer. In case of failure, we will read the data starting
-   * from totalAckDataLength.
+   * just update the totalAckDataLength. In case of failure,
+   * we will read the data starting from totalAckDataLength.
    */
   private void updateFlushIndex(long index) {
     if (!commitIndex2flushedDataMap.isEmpty()) {
@@ -310,13 +288,15 @@ public class BlockOutputStream extends OutputStream {
       LOG.debug("Total data successfully replicated: " + totalAckDataLength);
       futureMap.remove(totalAckDataLength);
       // Flush has been committed to required servers successful.
-      // just swap the bufferList head and tail after clearing.
-      ByteBuffer currentBuffer = bufferList.remove(0);
-      currentBuffer.clear();
-      if (currentBufferIndex != 0) {
-        currentBufferIndex--;
+      // just release the current buffer from the buffer pool.
+
+      // every entry removed from the putBlock future Map signifies
+      // streamBufferFlushSize/chunkSize no of chunks successfully committed.
+      // Release the buffers from the buffer pool to be reused again.
+      int chunkCount = (int) (streamBufferFlushSize / chunkSize);
+      for (int i = 0; i < chunkCount; i++) {
+        bufferPool.releaseBuffer();
       }
-      bufferList.add(currentBuffer);
     }
   }
 
@@ -450,91 +430,85 @@ public class BlockOutputStream extends OutputStream {
   @Override
   public void flush() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
-        && bufferList != null) {
-      checkOpen();
-      int bufferSize = bufferList.size();
-      if (bufferSize > 0) {
-        try {
-          // flush the last chunk data residing on the currentBuffer
-          if (totalDataFlushedLength < writtenDataLength) {
-            ByteBuffer currentBuffer = getCurrentBuffer();
-            int pos = currentBuffer.position() - (currentBuffer.position()
-                % chunkSize);
-            int limit = currentBuffer.position() - pos;
-            writeChunk(pos, currentBuffer.position());
-            totalDataFlushedLength += limit;
-            handlePartialFlush();
-          }
-          waitOnFlushFutures();
-          // just check again if the exception is hit while waiting for the
-          // futures to ensure flush has indeed succeeded
-          checkOpen();
-        } catch (InterruptedException | ExecutionException e) {
-          adjustBuffersOnException();
-          throw new IOException(
-              "Unexpected Storage Container Exception: " + e.toString(), e);
-        }
+        && bufferPool != null && bufferPool.getSize() > 0) {
+      try {
+        handleFlush();
+      } catch (InterruptedException | ExecutionException e) {
+        adjustBuffersOnException();
+        throw new IOException(
+            "Unexpected Storage Container Exception: " + e.toString(), e);
       }
     }
   }
 
-  private void writeChunk(int pos, int limit) throws IOException {
+
+  private void writeChunk(ByteBuffer buffer)
+      throws IOException {
     // Please note : We are not flipping the slice when we write since
     // the slices are pointing the currentBuffer start and end as needed for
     // the chunk write. Also please note, Duplicate does not create a
     // copy of data, it only creates metadata that points to the data
     // stream.
-    ByteBuffer chunk = bufferList.get(currentBufferIndex).duplicate();
-    chunk.position(pos);
-    chunk.limit(limit);
+    ByteBuffer chunk = buffer.duplicate();
+    chunk.position(0);
+    chunk.limit(buffer.position());
     writeChunkToContainer(chunk);
   }
 
+  private void handleFlush()
+      throws IOException, InterruptedException, ExecutionException {
+    checkOpen();
+    // flush the last chunk data residing on the currentBuffer
+    if (totalDataFlushedLength < writtenDataLength) {
+      ByteBuffer currentBuffer = bufferPool.getBuffer();
+      int pos = currentBuffer.position();
+      writeChunk(currentBuffer);
+      totalDataFlushedLength += pos;
+      handlePartialFlush();
+    }
+    waitOnFlushFutures();
+    // just check again if the exception is hit while waiting for the
+    // futures to ensure flush has indeed succeeded
+
+    // irrespective of whether the commitIndex2flushedDataMap is empty
+    // or not, ensure there is no exception set
+    checkOpen();
+
+  }
+
   @Override
   public void close() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
-        && bufferList != null) {
-      int bufferSize = bufferList.size();
-      if (bufferSize > 0) {
-        try {
-          // flush the last chunk data residing on the currentBuffer
-          if (totalDataFlushedLength < writtenDataLength) {
-            ByteBuffer currentBuffer = getCurrentBuffer();
-            int pos = currentBuffer.position() - (currentBuffer.position()
-                % chunkSize);
-            int limit = currentBuffer.position() - pos;
-            writeChunk(pos, currentBuffer.position());
-            totalDataFlushedLength += limit;
-            handlePartialFlush();
-          }
-          waitOnFlushFutures();
-          // irrespective of whether the commitIndex2flushedDataMap is empty
-          // or not, ensure there is no exception set
-          checkOpen();
-          if (!commitIndex2flushedDataMap.isEmpty()) {
-            // wait for the last commit index in the commitIndex2flushedDataMap
-            // to get committed to all or majority of nodes in case timeout
-            // happens.
-            long lastIndex =
-                commitIndex2flushedDataMap.keySet().stream()
-                    .mapToLong(v -> v).max().getAsLong();
-            LOG.debug(
-                "waiting for last flush Index " + lastIndex + " to catch up");
-            watchForCommit(lastIndex);
-          }
-        } catch (InterruptedException | ExecutionException e) {
-          adjustBuffersOnException();
-          throw new IOException(
-              "Unexpected Storage Container Exception: " + e.toString(), e);
-        } finally {
-          cleanup(false);
+        && bufferPool != null && bufferPool.getSize() > 0) {
+      try {
+        handleFlush();
+        if (!commitIndex2flushedDataMap.isEmpty()) {
+          // wait for the last commit index in the commitIndex2flushedDataMap
+          // to get committed to all or majority of nodes in case timeout
+          // happens.
+          long lastIndex =
+              commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
+                  .max().getAsLong();
+          LOG.debug(
+              "waiting for last flush Index " + lastIndex + " to catch up");
+          watchForCommit(lastIndex);
         }
+      } catch (InterruptedException | ExecutionException e) {
+        adjustBuffersOnException();
+        throw new IOException(
+            "Unexpected Storage Container Exception: " + e.toString(), e);
+      } finally {
+        cleanup(false);
       }
-      // clear the currentBuffer
-      bufferList.stream().forEach(ByteBuffer::clear);
+      // TODO: Turn the below buffer empty check on whne Standalone pipeline
+      // is removed in the write path in tests
+      // Preconditions.checkArgument(buffer.position() == 0);
+      // bufferPool.checkBufferPoolEmpty();
+
     }
   }
 
+
   private void waitOnFlushFutures()
       throws InterruptedException, ExecutionException {
     CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
new file mode 100644
index 0000000..541e6bd
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class creates and manages pool of n buffers.
+ */
+public class BufferPool {
+
+  private List<ByteBuffer> bufferList;
+  private int currentBufferIndex;
+  private final int bufferSize;
+  private final int capacity;
+
+  public BufferPool(int bufferSize, int capacity) {
+    this.capacity = capacity;
+    this.bufferSize = bufferSize;
+    bufferList = new ArrayList<>(capacity);
+    currentBufferIndex = -1;
+  }
+
+  public ByteBuffer getBuffer() {
+    return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex);
+  }
+
+  /**
+   * If the currentBufferIndex is less than the buffer size - 1,
+   * it means, the next buffer in the list has been freed up for
+   * rewriting. Reuse the next available buffer in such cases.
+   *
+   * In case, the currentBufferIndex == buffer.size and buffer size is still
+   * less than the capacity to be allocated, just allocate a buffer of size
+   * chunk size.
+   *
+   */
+  public ByteBuffer allocateBufferIfNeeded() {
+    ByteBuffer buffer = getBuffer();
+    if (buffer != null && buffer.hasRemaining()) {
+      return buffer;
+    }
+    if (currentBufferIndex < bufferList.size() - 1) {
+      buffer = getBuffer(currentBufferIndex + 1);
+    } else {
+      buffer = ByteBuffer.allocate(bufferSize);
+      bufferList.add(buffer);
+    }
+    Preconditions.checkArgument(bufferList.size() <= capacity);
+    currentBufferIndex++;
+    // TODO: Turn the below precondition check on when Standalone pipeline
+    // is removed in the write path in tests
+    // Preconditions.checkArgument(buffer.position() == 0);
+    return buffer;
+  }
+
+  public void releaseBuffer() {
+    // always remove from head of the list and append at last
+    ByteBuffer buffer = bufferList.remove(0);
+    buffer.clear();
+    bufferList.add(buffer);
+    currentBufferIndex--;
+  }
+
+  public void clearBufferPool() {
+    bufferList.clear();
+    currentBufferIndex = -1;
+  }
+
+  public void checkBufferPoolEmpty() {
+    Preconditions.checkArgument(computeBufferData() == 0);
+  }
+  public long computeBufferData() {
+    return bufferList.stream().mapToInt(value -> value.position())
+        .sum();
+  }
+
+  public int getSize() {
+    return bufferList.size();
+  }
+
+  ByteBuffer getBuffer(int index) {
+    return bufferList.get(index);
+  }
+
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 16a825f..b6de8ab 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.client.io;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -29,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -58,14 +57,14 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
-  private List<ByteBuffer> bufferList;
+  private BufferPool bufferPool;
 
   @SuppressWarnings("parameternumber")
   private BlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager,
       Pipeline pipeline, String requestId, int chunkSize,
       long length, long streamBufferFlushSize, long streamBufferMaxSize,
-      long watchTimeout, List<ByteBuffer> bufferList,
+      long watchTimeout, BufferPool bufferPool,
       ChecksumType checksumType, int bytesPerChecksum,
       Token<OzoneBlockTokenIdentifier> token) {
     this.outputStream = null;
@@ -81,7 +80,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
-    this.bufferList = bufferList;
+    this.bufferPool = bufferPool;
     this.checksumType = checksumType;
     this.bytesPerChecksum = bytesPerChecksum;
   }
@@ -112,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       this.outputStream =
           new BlockOutputStream(blockID, key, xceiverClientManager,
               pipeline, requestId, chunkSize, streamBufferFlushSize,
-              streamBufferMaxSize, watchTimeout, bufferList, checksumType,
+              streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
               bytesPerChecksum);
     }
   }
@@ -212,7 +211,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
     private long streamBufferFlushSize;
     private long streamBufferMaxSize;
     private long watchTimeout;
-    private List<ByteBuffer> bufferList;
+    private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private ChecksumType checksumType;
     private int bytesPerChecksum;
@@ -278,8 +277,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return this;
     }
 
-    public Builder setBufferList(List<ByteBuffer> bffrLst) {
-      this.bufferList = bffrLst;
+    public Builder setbufferPool(BufferPool pool) {
+      this.bufferPool = pool;
       return this;
     }
 
@@ -292,7 +291,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return new BlockOutputStreamEntry(blockID, key,
           xceiverClientManager, pipeline, requestId, chunkSize,
           length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
-          bufferList, checksumType, bytesPerChecksum, token);
+          bufferPool, checksumType, bytesPerChecksum, token);
     }
   }
 
@@ -340,8 +339,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
     return watchTimeout;
   }
 
-  public List<ByteBuffer> getBufferList() {
-    return bufferList;
+  public BufferPool getBufferPool() {
+    return bufferPool;
   }
 
   public void setCurrentPosition(long curPosition) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 7035c73..d1acbe1 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -45,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Collection;
@@ -83,7 +83,7 @@ public class KeyOutputStream extends OutputStream {
   private final long blockSize;
   private final int bytesPerChecksum;
   private final ChecksumType checksumType;
-  private List<ByteBuffer> bufferList;
+  private final BufferPool bufferPool;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
   private ExcludeList excludeList;
@@ -104,9 +104,7 @@ public class KeyOutputStream extends OutputStream {
     closed = false;
     streamBufferFlushSize = 0;
     streamBufferMaxSize = 0;
-    bufferList = new ArrayList<>(1);
-    ByteBuffer buffer = ByteBuffer.allocate(1);
-    bufferList.add(buffer);
+    bufferPool = new BufferPool(chunkSize, 1);
     watchTimeout = 0;
     blockSize = 0;
     this.checksumType = ChecksumType.valueOf(
@@ -182,7 +180,8 @@ public class KeyOutputStream extends OutputStream {
     Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
     Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
     Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
-    this.bufferList = new ArrayList<>();
+    this.bufferPool =
+        new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
     this.excludeList = new ExcludeList();
   }
 
@@ -228,7 +227,7 @@ public class KeyOutputStream extends OutputStream {
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
-            .setBufferList(bufferList)
+            .setbufferPool(bufferPool)
             .setChecksumType(checksumType)
             .setBytesPerChecksum(bytesPerChecksum)
             .setToken(subKeyInfo.getToken());
@@ -272,8 +271,7 @@ public class KeyOutputStream extends OutputStream {
   }
 
   private long computeBufferData() {
-    return bufferList.stream().mapToInt(value -> value.position())
-        .sum();
+    return bufferPool.computeBufferData();
   }
 
   private void handleWrite(byte[] b, int off, long len, boolean retry)
@@ -580,10 +578,7 @@ public class KeyOutputStream extends OutputStream {
     } catch (IOException ioe) {
       throw ioe;
     } finally {
-      if (bufferList != null) {
-        bufferList.stream().forEach(e -> e.clear());
-      }
-      bufferList = null;
+      bufferPool.clearBufferPool();
     }
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index bf4f4d4..396351d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -87,6 +87,7 @@ public class TestCloseContainerHandlingByClient {
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
@@ -133,7 +134,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     key.write(data);
     key.flush();
     key.close();
@@ -166,7 +167,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     key.close();
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
@@ -180,12 +181,12 @@ public class TestCloseContainerHandlingByClient {
 
     String keyName = getKeyName();
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
+        createKey(keyName, ReplicationType.RATIS, (3 * blockSize));
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
-    Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
-    // write data more than 1 chunk
+    Assert.assertEquals(3, keyOutputStream.getStreamEntries().size());
+    // write data more than 1 block
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
             .getBytes(UTF_8);
@@ -199,7 +200,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     // write 1 more block worth of data. It will fail and new block will be
     // allocated
     key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
@@ -258,7 +259,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
 
     key.close();
     // read the key from OM again and match the length.The length will still
@@ -301,7 +302,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     // write 3 more chunks worth of data. It will fail and new block will be
     // allocated. This write completes 4 blocks worth of data written to key
     data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
@@ -330,8 +331,7 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertEquals(4 * blockSize, length);
   }
 
-  private void waitForContainerClose(String keyName,
-      OzoneOutputStream outputStream)
+  private void waitForContainerClose(OzoneOutputStream outputStream)
       throws Exception {
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) outputStream.getOutputStream();
@@ -375,7 +375,7 @@ public class TestCloseContainerHandlingByClient {
             .getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     dataString =
         ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     data = dataString.getBytes(UTF_8);
@@ -421,7 +421,7 @@ public class TestCloseContainerHandlingByClient {
         .build();
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     // Again Write the Data. This will throw an exception which will be handled
     // and new blocks will be allocated
     key.write(data);
@@ -435,4 +435,42 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     validateData(keyName, dataString.getBytes(UTF_8));
   }
+
+  @Test
+  public void testBlockWrites() throws Exception {
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, 2 * chunkSize)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(key);
+    byte[] data2 =
+        ContainerTestHelper.getFixedLengthString(keyString, 3 * chunkSize)
+            .getBytes(UTF_8);
+    key.write(data2);
+    key.flush();
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(5 * chunkSize, keyInfo.getDataSize());
+
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    // Written the same data twice
+    String dataString2 = new String(data2, UTF_8);
+    dataString = dataString.concat(dataString2);
+    validateData(keyName, dataString.getBytes(UTF_8));
+  }
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 78f7d29..41d87a2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -142,7 +142,7 @@ public class TestBlockDeletion {
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).build();
     List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
         om.lookupKey(keyArgs).getKeyLocationVersions();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org