You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ha...@apache.org on 2021/04/01 17:18:16 UTC

[ozone] branch master updated: HDDS-4553. ChunkInputStream should release buffer as soon as last byte in the buffer is read (#2062)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b57041e  HDDS-4553. ChunkInputStream should release buffer as soon as last byte in the buffer is read (#2062)
b57041e is described below

commit b57041ea00aef98dd13e64f642825fcf2cbbf78a
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Apr 1 10:17:52 2021 -0700

    HDDS-4553. ChunkInputStream should release buffer as soon as last byte in the buffer is read (#2062)
---
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  | 119 +++++++++++++-------
 .../hdds/scm/storage/DummyChunkInputStream.java    |   5 +-
 .../hdds/scm/storage/TestChunkInputStream.java     |  25 +++--
 .../common/ChunkBufferImplWithByteBufferList.java  |  13 ++-
 .../ozone/common/IncrementalChunkBuffer.java       |  13 ++-
 .../hadoop/ozone/common/utils/BufferUtils.java     |  36 +++++-
 .../keyvalue/impl/FilePerBlockStrategy.java        |  27 +----
 .../keyvalue/impl/FilePerChunkStrategy.java        |  29 +----
 .../keyvalue/interfaces/ChunkManager.java          |  31 +++++
 .../client/rpc/read/TestChunkInputStream.java      | 125 +++++++++++++++++----
 10 files changed, 299 insertions(+), 124 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index dc9123d..0653d1b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.storage;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.Seekable;
@@ -71,8 +70,8 @@ public class ChunkInputStream extends InputStream
   private final Supplier<Pipeline> pipelineSupplier;
   private boolean verifyChecksum;
   private boolean allocated = false;
-  // Buffer to store the chunk data read from the DN container
-  private List<ByteBuffer> buffers;
+  // Buffers to store the chunk data read from the DN container
+  private ByteBuffer[] buffers;
 
   // Index of the buffers corresponding to the current position of the buffers
   private int bufferIndex;
@@ -89,6 +88,9 @@ public class ChunkInputStream extends InputStream
   // of chunk data
   private long bufferOffsetWrtChunkData;
 
+  // Index of the first buffer which has not been released
+  private int firstUnreleasedBufferIndex = 0;
+
   // The number of bytes of chunk data residing in the buffers currently
   private long buffersSize;
 
@@ -133,13 +135,11 @@ public class ChunkInputStream extends InputStream
       // been released by now
       Preconditions.checkState(buffers == null);
     } else {
-      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+      dataout = Byte.toUnsignedInt(buffers[bufferIndex].get());
     }
 
-    if (chunkStreamEOF()) {
-      // consumer might use getPos to determine EOF,
-      // so release buffers when serving the last byte of data
-      releaseBuffers();
+    if (bufferEOF()) {
+      releaseBuffers(bufferIndex);
     }
 
     return dataout;
@@ -179,15 +179,13 @@ public class ChunkInputStream extends InputStream
         Preconditions.checkState(buffers == null);
         return total != 0 ? total : EOF;
       }
-      buffers.get(bufferIndex).get(b, off + total, available);
+      buffers[bufferIndex].get(b, off + total, available);
       len -= available;
       total += available;
-    }
 
-    if (chunkStreamEOF()) {
-      // smart consumers determine EOF by calling getPos()
-      // so we release buffers when serving the final bytes of data
-      releaseBuffers();
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
     }
 
     return total;
@@ -233,7 +231,7 @@ public class ChunkInputStream extends InputStream
       // BufferOffset w.r.t to ChunkData + BufferOffset w.r.t buffers +
       // Position of current Buffer
       return bufferOffsetWrtChunkData + bufferOffsets[bufferIndex] +
-          buffers.get(bufferIndex).position();
+          buffers[bufferIndex].position();
     }
     if (buffersAllocated()) {
       return bufferOffsetWrtChunkData + buffersSize;
@@ -289,7 +287,7 @@ public class ChunkInputStream extends InputStream
       }
       if (buffersHaveData()) {
         // Data is available from buffers
-        ByteBuffer bb = buffers.get(bufferIndex);
+        ByteBuffer bb = buffers[bufferIndex];
         return len > bb.remaining() ? bb.remaining() : len;
       } else if (dataRemainingInChunk()) {
         // There is more data in the chunk stream which has not
@@ -370,14 +368,15 @@ public class ChunkInputStream extends InputStream
     buffers = readChunk(readChunkInfo);
     buffersSize = readChunkInfo.getLen();
 
-    bufferOffsets = new long[buffers.size()];
+    bufferOffsets = new long[buffers.length];
     int tempOffset = 0;
-    for (int i = 0; i < buffers.size(); i++) {
+    for (int i = 0; i < buffers.length; i++) {
       bufferOffsets[i] = tempOffset;
-      tempOffset += buffers.get(i).limit();
+      tempOffset += buffers[i].limit();
     }
 
     bufferIndex = 0;
+    firstUnreleasedBufferIndex = 0;
     allocated = true;
   }
 
@@ -385,7 +384,7 @@ public class ChunkInputStream extends InputStream
    * Send RPC call to get the chunk from the container.
    */
   @VisibleForTesting
-  protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo)
+  protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
       throws IOException {
     ReadChunkResponseProto readChunkResponse;
 
@@ -405,13 +404,12 @@ public class ChunkInputStream extends InputStream
     }
 
     if (readChunkResponse.hasData()) {
-      return readChunkResponse.getData().asReadOnlyByteBufferList();
+      return readChunkResponse.getData().asReadOnlyByteBufferList()
+          .toArray(new ByteBuffer[0]);
     } else if (readChunkResponse.hasDataBuffers()) {
       List<ByteString> buffersList = readChunkResponse.getDataBuffers()
           .getBuffersList();
-      return buffersList.stream()
-          .map(ByteString::asReadOnlyByteBuffer)
-          .collect(Collectors.toList());
+      return BufferUtils.getReadOnlyByteBuffersArray(buffersList);
     } else {
       throw new IOException("Unexpected error while reading chunk data " +
           "from container. No data returned.");
@@ -508,21 +506,21 @@ public class ChunkInputStream extends InputStream
   private void adjustBufferPosition(long bufferPosition) {
     // The bufferPosition is w.r.t the current buffers.
     // Adjust the bufferIndex and position to the seeked bufferPosition.
-    if (bufferIndex >= buffers.size()) {
+    if (bufferIndex >= buffers.length) {
       bufferIndex = Arrays.binarySearch(bufferOffsets, bufferPosition);
     } else if (bufferPosition < bufferOffsets[bufferIndex]) {
       bufferIndex = Arrays.binarySearch(bufferOffsets, 0, bufferIndex,
           bufferPosition);
     } else if (bufferPosition >= bufferOffsets[bufferIndex] +
-        buffers.get(bufferIndex).capacity()) {
+        buffers[bufferIndex].capacity()) {
       bufferIndex = Arrays.binarySearch(bufferOffsets, bufferIndex + 1,
-          buffers.size(), bufferPosition);
+          buffers.length, bufferPosition);
     }
     if (bufferIndex < 0) {
       bufferIndex = -bufferIndex - 2;
     }
 
-    buffers.get(bufferIndex).position(
+    buffers[bufferIndex].position(
         (int) (bufferPosition - bufferOffsets[bufferIndex]));
 
     // Reset buffers > bufferIndex to position 0. We do this to reset any
@@ -531,8 +529,8 @@ public class ChunkInputStream extends InputStream
     // not required for this read. If a seek was done to a position in the
     // previous indices, the buffer position reset would be performed in the
     // seek call.
-    for (int i = bufferIndex + 1; i < buffers.size(); i++) {
-      buffers.get(i).position(0);
+    for (int i = bufferIndex + 1; i < buffers.length; i++) {
+      buffers[i].position(0);
     }
 
     // Reset the chunkPosition as chunk stream has been initialized i.e. the
@@ -545,7 +543,7 @@ public class ChunkInputStream extends InputStream
    */
   @VisibleForTesting
   protected boolean buffersAllocated() {
-    return buffers != null && !buffers.isEmpty();
+    return buffers != null && buffers.length > 0;
   }
 
   /**
@@ -556,8 +554,9 @@ public class ChunkInputStream extends InputStream
     boolean hasData = false;
 
     if (buffersAllocated()) {
-      while (bufferIndex < (buffers.size())) {
-        if (buffers.get(bufferIndex).hasRemaining()) {
+      while (bufferIndex < (buffers.length)) {
+        if (buffers[bufferIndex] != null &&
+            buffers[bufferIndex].hasRemaining()) {
           // current buffer has data
           hasData = true;
           break;
@@ -565,7 +564,7 @@ public class ChunkInputStream extends InputStream
           if (buffersRemaining()) {
             // move to next available buffer
             ++bufferIndex;
-            Preconditions.checkState(bufferIndex < buffers.size());
+            Preconditions.checkState(bufferIndex < buffers.length);
           } else {
             // no more buffers remaining
             break;
@@ -578,7 +577,7 @@ public class ChunkInputStream extends InputStream
   }
 
   private boolean buffersRemaining() {
-    return (bufferIndex < (buffers.size() - 1));
+    return (bufferIndex < (buffers.length - 1));
   }
 
   /**
@@ -588,7 +587,10 @@ public class ChunkInputStream extends InputStream
     // Check if buffers have been allocated
     if (buffersAllocated()) {
       // Check if the current buffers cover the input position
-      return pos >= bufferOffsetWrtChunkData &&
+      // Released buffers should not be considered when checking if position
+      // is available
+      return pos >= bufferOffsetWrtChunkData +
+          bufferOffsets[firstUnreleasedBufferIndex] &&
           pos < bufferOffsetWrtChunkData + buffersSize;
     }
     return false;
@@ -610,6 +612,21 @@ public class ChunkInputStream extends InputStream
   }
 
   /**
+   * Check if current buffer had been read till the end.
+   */
+  private boolean bufferEOF() {
+    if (!allocated) {
+      // Chunk data has not been read yet
+      return false;
+    }
+
+    if (!buffers[bufferIndex].hasRemaining()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Check if end of chunkStream has been reached.
    */
   private boolean chunkStreamEOF() {
@@ -628,12 +645,38 @@ public class ChunkInputStream extends InputStream
     }
   }
 
+
+  /**
+   * Release the buffers upto the given index.
+   * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
+   *                               buffers must be released
+   */
+  private void releaseBuffers(int releaseUptoBufferIndex) {
+    if (releaseUptoBufferIndex == buffers.length - 1) {
+      // Before releasing all the buffers, if chunk EOF is not reached, then
+      // chunkPosition should be set to point to the last position of the
+      // buffers. This should be done so that getPos() can return the current
+      // chunk position
+      chunkPosition = bufferOffsetWrtChunkData +
+          bufferOffsets[releaseUptoBufferIndex] +
+          buffers[releaseUptoBufferIndex].capacity();
+      // Release all the buffers
+      releaseBuffers();
+    } else {
+      for (int i = 0; i <= releaseUptoBufferIndex; i++) {
+        buffers[i] = null;
+      }
+      firstUnreleasedBufferIndex = releaseUptoBufferIndex + 1;
+    }
+  }
+
   /**
    * If EOF is reached, release the buffers.
    */
   private void releaseBuffers() {
     buffers = null;
     bufferIndex = 0;
+    firstUnreleasedBufferIndex = 0;
     // We should not reset bufferOffsetWrtChunkData and buffersSize here
     // because when getPos() is called in chunkStreamEOF() we use these
     // values and determine whether chunk is read completely or not.
@@ -671,7 +714,7 @@ public class ChunkInputStream extends InputStream
   }
 
   @VisibleForTesting
-  public List<ByteBuffer> getCachedBuffers() {
-    return buffers;
+  public ByteBuffer[] getCachedBuffers() {
+    return BufferUtils.getReadOnlyByteBuffers(buffers);
   }
 }
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
index 8453210..78d0c05 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
@@ -50,7 +50,7 @@ public class DummyChunkInputStream extends ChunkInputStream {
   }
 
   @Override
-  protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
+  protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
     int offset = (int) readChunkInfo.getOffset();
     int remainingToRead = (int) readChunkInfo.getLen();
 
@@ -72,7 +72,8 @@ public class DummyChunkInputStream extends ChunkInputStream {
       remainingToRead -= bufferLen;
     }
 
-    return BufferUtils.getReadOnlyByteBuffers(readByteBuffers);
+    return BufferUtils.getReadOnlyByteBuffers(readByteBuffers)
+        .toArray(new ByteBuffer[0]);
   }
 
   @Override
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index f6d9b8d..b06b702 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -172,16 +172,26 @@ public class TestChunkInputStream {
     // buffers and the chunkPosition should be reset to -1.
     Assert.assertEquals(-1, chunkStream.getChunkPosition());
 
-    // Seek to a position within the current buffers. Current buffers contain
-    // data from index 20 to 59. ChunkPosition should still not be used to
-    // set the position.
-    seekAndVerify(35);
+    // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
+    // buffers are released after each checksum boundary is read. So the
+    // buffers should contain data from index 40 to 59.
+    // Seek to a position within the cached buffers. ChunkPosition should
+    // still not be used to set the position.
+    seekAndVerify(45);
     Assert.assertEquals(-1, chunkStream.getChunkPosition());
 
-    // Seek to a position outside the current buffers. In this case, the
+    // Seek to a position outside the current cached buffers. In this case, the
     // chunkPosition should be updated to the seeked position.
     seekAndVerify(75);
     Assert.assertEquals(75, chunkStream.getChunkPosition());
+
+    // Read upto checksum boundary should result in all the buffers being
+    // released and hence chunkPosition updated with current position of chunk.
+    seekAndVerify(25);
+    b = new byte[15];
+    chunkStream.read(b, 0, 15);
+    matchWithInputData(b, 25, 15);
+    Assert.assertEquals(40, chunkStream.getChunkPosition());
   }
 
   @Test
@@ -229,8 +239,9 @@ public class TestChunkInputStream {
     ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
         clientFactory, pipelineRef::get, false, null) {
       @Override
-      protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
-        return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList();
+      protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
+        return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList()
+            .toArray(new ByteBuffer[0]);
       }
     };
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index 94ad552..f3f3173 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -218,13 +217,21 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
 
   @Override
   public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
-    return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat);
+    ByteString result = ByteString.EMPTY;
+    for (ByteBuffer buffer : buffers) {
+      result = result.concat(f.apply(buffer));
+    }
+    return result;
   }
 
   @Override
   public List<ByteString> toByteStringListImpl(
       Function<ByteBuffer, ByteString> f) {
-    return buffers.stream().map(f).collect(Collectors.toList());
+    List<ByteString> byteStringList = new ArrayList<>();
+    for (ByteBuffer buffer : buffers) {
+      byteStringList.add(f.apply(buffer));
+    }
+    return byteStringList;
   }
 
   @Override
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index ce7b1eb..7622ffc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.common;
 
 import com.google.common.base.Preconditions;
-import java.util.stream.Collectors;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -271,13 +270,21 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
 
   @Override
   public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
-    return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat);
+    ByteString result = ByteString.EMPTY;
+    for (ByteBuffer buffer : buffers) {
+      result = result.concat(f.apply(buffer));
+    }
+    return result;
   }
 
   @Override
   public List<ByteString> toByteStringListImpl(
       Function<ByteBuffer, ByteString> f) {
-    return buffers.stream().map(f).collect(Collectors.toList());
+    List<ByteString> byteStringList = new ArrayList<>();
+    for (ByteBuffer buffer : buffers) {
+      byteStringList.add(f.apply(buffer));
+    }
+    return byteStringList;
   }
 
   @Override
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
index a8be69d..61e96fb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -71,8 +71,36 @@ public final class BufferUtils {
     return buffers;
   }
 
+  /**
+   * Return a read only copy of ByteBuffer array.
+   */
+  public static ByteBuffer[] getReadOnlyByteBuffers(
+      ByteBuffer[] byteBuffers) {
+    if (byteBuffers == null) {
+      return null;
+    }
+    ByteBuffer[] readOnlyBuffers = new ByteBuffer[byteBuffers.length];
+    for (int i = 0; i < byteBuffers.length; i++) {
+      readOnlyBuffers[i] = byteBuffers[i] == null ?
+          null : byteBuffers[i].asReadOnlyBuffer();
+    }
+    return readOnlyBuffers;
+  }
+
+  /**
+   * Return a read only ByteBuffer array for the input ByteStrings list.
+   */
+  public static ByteBuffer[] getReadOnlyByteBuffersArray(
+      List<ByteString> byteStrings) {
+    return getReadOnlyByteBuffers(byteStrings).toArray(new ByteBuffer[0]);
+  }
+
   public static ByteString concatByteStrings(List<ByteString> byteStrings) {
-    return byteStrings.stream().reduce(ByteString::concat).orElse(null);
+    ByteString result = ByteString.EMPTY;
+    for (ByteString byteString : byteStrings) {
+      result = result.concat(byteString);
+    }
+    return result;
   }
 
   /**
@@ -96,4 +124,10 @@ public final class BufferUtils {
   public static int getNumberOfBins(long numElements, long maxElementsPerBin) {
     return (int) Math.ceil((double) numElements / (double) maxElementsPerBin);
   }
+
+  public static void clearBuffers(ByteBuffer[] byteBuffers) {
+    for (ByteBuffer buffer : byteBuffers) {
+      buffer.clear();
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 925ef71..0594e17 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -153,30 +152,8 @@ public class FilePerBlockStrategy implements ChunkManager {
 
     long len = info.getLen();
     long offset = info.getOffset();
-
-    long bufferCapacity = 0;
-    if (info.isReadDataIntoSingleBuffer()) {
-      // Older client - read all chunk data into one single buffer.
-      bufferCapacity = len;
-    } else {
-      // Set buffer capacity to checksum boundary size so that each buffer
-      // corresponds to one checksum. If checksum is NONE, then set buffer
-      // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
-      ChecksumData checksumData = info.getChecksumData();
-
-      if (checksumData != null) {
-        if (checksumData.getChecksumType() ==
-            ContainerProtos.ChecksumType.NONE) {
-          bufferCapacity = defaultReadBufferCapacity;
-        } else {
-          bufferCapacity = checksumData.getBytesPerChecksum();
-        }
-      }
-    }
-    // If the buffer capacity is 0, set all the data into one ByteBuffer
-    if (bufferCapacity == 0) {
-      bufferCapacity = len;
-    }
+    long bufferCapacity =  ChunkManager.getBufferCapacityForChunkRead(info,
+        defaultReadBufferCapacity);
 
     ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
         bufferCapacity);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index d7f994e..82ac4e8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -228,30 +227,8 @@ public class FilePerChunkStrategy implements ChunkManager {
     }
 
     long len = info.getLen();
-
-    long bufferCapacity = 0;
-    if (info.isReadDataIntoSingleBuffer()) {
-      // Older client - read all chunk data into one single buffer.
-      bufferCapacity = len;
-    } else {
-      // Set buffer capacity to checksum boundary size so that each buffer
-      // corresponds to one checksum. If checksum is NONE, then set buffer
-      // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
-      ChecksumData checksumData = info.getChecksumData();
-
-      if (checksumData != null) {
-        if (checksumData.getChecksumType() ==
-            ContainerProtos.ChecksumType.NONE) {
-          bufferCapacity = defaultReadBufferCapacity;
-        } else {
-          bufferCapacity = checksumData.getBytesPerChecksum();
-        }
-      }
-    }
-    // If the buffer capacity is 0, set all the data into one ByteBuffer
-    if (bufferCapacity == 0) {
-      bufferCapacity = len;
-    }
+    long bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
+        defaultReadBufferCapacity);
 
     ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
         bufferCapacity);
@@ -296,7 +273,7 @@ public class FilePerChunkStrategy implements ChunkManager {
         if (ex.getResult() != UNABLE_TO_FIND_CHUNK) {
           throw ex;
         }
-        dataBuffers = null;
+        BufferUtils.clearBuffers(dataBuffers);
       }
     }
     throw new StorageContainerException(
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 8db72f7..15ff9d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.ozone.container.keyvalue.interfaces;
  */
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -101,4 +103,33 @@ public interface ChunkManager {
       BlockData blockData) throws IOException {
     // no-op
   }
+
+  static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
+      long defaultReadBufferCapacity) {
+    long bufferCapacity = 0;
+    if (chunkInfo.isReadDataIntoSingleBuffer()) {
+      // Older client - read all chunk data into one single buffer.
+      bufferCapacity = chunkInfo.getLen();
+    } else {
+      // Set buffer capacity to checksum boundary size so that each buffer
+      // corresponds to one checksum. If checksum is NONE, then set buffer
+      // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
+      ChecksumData checksumData = chunkInfo.getChecksumData();
+
+      if (checksumData != null) {
+        if (checksumData.getChecksumType() ==
+            ContainerProtos.ChecksumType.NONE) {
+          bufferCapacity = defaultReadBufferCapacity;
+        } else {
+          bufferCapacity = checksumData.getBytesPerChecksum();
+        }
+      }
+    }
+    // If the buffer capacity is 0, set all the data into one ByteBuffer
+    if (bufferCapacity == 0) {
+      bufferCapacity = chunkInfo.getLen();
+    }
+
+    return bufferCapacity;
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
index 2ad91a7..fc051c8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.client.rpc.read;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
@@ -56,12 +55,12 @@ public class TestChunkInputStream extends TestInputStreamBase {
     // To read 1 byte of chunk data, ChunkInputStream should get one full
     // checksum boundary worth of data from Container and store it in buffers.
     chunk0Stream.read(new byte[1]);
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1,
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
         BYTES_PER_CHECKSUM);
 
     // Read > checksum boundary of data from chunk0
     int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
-    byte[] readData = readDataFromChunk(chunk0Stream, readDataLen, 0);
+    byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
     validateData(inputData, 0, readData);
 
     // The first checksum boundary size of data was already existing in the
@@ -69,52 +68,140 @@ public class TestChunkInputStream extends TestInputStreamBase {
     // boundary size of data will be fetched again to read the remaining data.
     // Hence there should be 1 checksum boundary size of data stored in the
     // ChunkStreams buffers at the end of the read.
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1,
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
         BYTES_PER_CHECKSUM);
 
     // Seek to a position in the third checksum boundary (so that current
     // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
     // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
-    // data being read into the buffers. There should be 2 buffers each with
-    // BYTES_PER_CHECKSUM capacity.
+    // data being read into the buffers. There should be 2 buffers in the
+    // stream but the the first buffer should be released after it is read
+    // and the second buffer should have BYTES_PER_CHECKSUM capacity.
     readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
     int offset = 2 * BYTES_PER_CHECKSUM + 1;
-    readData = readDataFromChunk(chunk0Stream, readDataLen, offset);
+    readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
     validateData(inputData, offset, readData);
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2,
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
         BYTES_PER_CHECKSUM);
 
 
-    // Read the full chunk data -1 and verify that all chunk data is read into
-    // buffers. We read CHUNK_SIZE - 1 as otherwise the buffers will be
+    // Read the full chunk data - 1 and verify that all chunk data is read into
+    // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
     // released once all chunk data is read.
-    readData = readDataFromChunk(chunk0Stream, CHUNK_SIZE - 1, 0);
+    readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
     validateData(inputData, 0, readData);
+    int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
     checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
-        CHUNK_SIZE / BYTES_PER_CHECKSUM, BYTES_PER_CHECKSUM);
+        expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
 
     // Read the last byte of chunk and verify that the buffers are released.
     chunk0Stream.read(new byte[1]);
     Assert.assertNull("ChunkInputStream did not release buffers after " +
         "reaching EOF.", chunk0Stream.getCachedBuffers());
+  }
+
+  /**
+   * Test that ChunkInputStream buffers are released as soon as the last byte
+   * of the buffer is read.
+   */
+  @Test
+  public void testBufferRelease() throws Exception {
+    String keyName = getNewKeyName();
+    int dataLength = CHUNK_SIZE;
+    byte[] inputData = writeRandomBytes(keyName, dataLength);
 
+    try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+      BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
+      block0Stream.initialize();
+
+      ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
+
+      // Read checksum boundary - 1 bytes of data
+      int readDataLen = BYTES_PER_CHECKSUM - 1;
+      byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
+      validateData(inputData, 0, readData);
+
+      // There should be 1 byte of data remaining in the buffer which is not
+      // yet read. Hence, the buffer should not be released.
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
+          1, 0, BYTES_PER_CHECKSUM);
+      Assert.assertEquals(1, chunk0Stream.getCachedBuffers()[0].remaining());
+
+      // Reading the last byte in the buffer should result in all the buffers
+      // being released.
+      readData = readDataFromChunk(chunk0Stream, 1);
+      validateData(inputData, readDataLen, readData);
+      Assert
+          .assertNull("Chunk stream buffers not released after last byte is " +
+              "read", chunk0Stream.getCachedBuffers());
+
+      // Read more data to get the data till the next checksum boundary.
+      readDataLen = BYTES_PER_CHECKSUM / 2;
+      readData = readDataFromChunk(chunk0Stream, readDataLen);
+      // There should be one buffer and the buffer should not be released as
+      // there is data pending to be read from the buffer
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+          BYTES_PER_CHECKSUM);
+      ByteBuffer lastCachedBuffer = chunk0Stream.getCachedBuffers()[0];
+      Assert.assertEquals(BYTES_PER_CHECKSUM - readDataLen,
+          lastCachedBuffer.remaining());
+
+      // Read more than the remaining data in buffer (but less than the next
+      // checksum boundary).
+      int position = (int) chunk0Stream.getPos();
+      readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
+      readData = readDataFromChunk(chunk0Stream, readDataLen);
+      validateData(inputData, position, readData);
+      // After reading the remaining data in the buffer, the buffer should be
+      // released and next checksum size of data must be read into the buffers
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+          BYTES_PER_CHECKSUM);
+      // Verify that the previously cached buffer is released by comparing it
+      // with the current cached buffer
+      Assert.assertNotEquals(lastCachedBuffer,
+          chunk0Stream.getCachedBuffers()[0]);
+    }
   }
 
   private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
-      int readDataLength, int offset) throws IOException {
+      int offset, int readDataLength) throws IOException {
     byte[] readData = new byte[readDataLength];
     chunkInputStream.seek(offset);
     chunkInputStream.read(readData, 0, readDataLength);
     return readData;
   }
 
-  private void checkBufferSizeAndCapacity(List<ByteBuffer> buffers,
-      int expectedNumBuffers, long expectedBufferCapacity) {
+  private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
+      int readDataLength) throws IOException {
+    byte[] readData = new byte[readDataLength];
+    chunkInputStream.read(readData, 0, readDataLength);
+    return readData;
+  }
+
+  /**
+   * Verify number of buffers and their capacities.
+   * @param buffers chunk stream buffers
+   * @param expectedNumBuffers expected number of buffers
+   * @param numReleasedBuffers first numReleasedBuffers are expected to
+   *                           be released and hence null
+   * @param expectedBufferCapacity expected buffer capacity of unreleased
+   *                               buffers
+   */
+  private void checkBufferSizeAndCapacity(ByteBuffer[] buffers,
+      int expectedNumBuffers, int numReleasedBuffers,
+      long expectedBufferCapacity) {
     Assert.assertEquals("ChunkInputStream does not have expected number of " +
-        "ByteBuffers", expectedNumBuffers, buffers.size());
-    for (ByteBuffer buffer : buffers) {
-      Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong",
-          expectedBufferCapacity, buffer.capacity());
+        "ByteBuffers", expectedNumBuffers, buffers.length);
+    for (int i = 0; i < buffers.length; i++) {
+      if (i <= numReleasedBuffers - 1) {
+        // This buffer should have been released and hence null
+        Assert.assertNull("ChunkInputStream Buffer not released after being " +
+            "read", buffers[i]);
+      } else {
+        Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong",
+            expectedBufferCapacity, buffers[i].capacity());
+      }
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org