You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/06/07 02:44:46 UTC

[hadoop] branch trunk updated: HDDS-1496. Support partial chunk reads and checksum verification (#804)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a91d24f  HDDS-1496. Support partial chunk reads and checksum verification (#804)
a91d24f is described below

commit a91d24fea45c2d269fabe46d43d5d4156ba47e1c
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Jun 6 19:44:40 2019 -0700

    HDDS-1496. Support partial chunk reads and checksum verification (#804)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 634 +++++++++------------
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  | 546 ++++++++++++++++++
 .../hdds/scm/storage/TestBlockInputStream.java     | 251 ++++----
 .../hdds/scm/storage/TestChunkInputStream.java     | 224 ++++++++
 .../org/apache/hadoop/ozone/common/Checksum.java   |  29 +-
 .../apache/hadoop/ozone/common/ChecksumData.java   |  37 +-
 .../hadoop/ozone/client/io/KeyInputStream.java     | 380 +++++-------
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   4 +-
 .../web/storage/DistributedStorageHandler.java     |   3 +-
 .../apache/hadoop/ozone/om/TestChunkStreams.java   |  11 +-
 10 files changed, 1377 insertions(+), 742 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 82fb106..bccbc9b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -19,470 +19,370 @@
 package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadChunkResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 /**
- * An {@link InputStream} used by the REST service in combination with the
- * SCMClient to read the value of a key from a sequence
- * of container chunks.  All bytes of the key value are stored in container
- * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
- * instances.  This class encapsulates all state management for iterating
- * through the sequence of chunks and the sequence of buffers within each chunk.
+ * An {@link InputStream} called from KeyInputStream to read a block from the
+ * container.
+ * This class encapsulates all state management for iterating
+ * through the sequence of chunks through {@link ChunkInputStream}.
  */
 public class BlockInputStream extends InputStream implements Seekable {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockInputStream.class);
+
   private static final int EOF = -1;
 
   private final BlockID blockID;
+  private final long length;
+  private Pipeline pipeline;
+  private final Token<OzoneBlockTokenIdentifier> token;
+  private final boolean verifyChecksum;
   private final String traceID;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
-  private List<ChunkInfo> chunks;
-  // ChunkIndex points to the index current chunk in the buffers or the the
-  // index of chunk which will be read next into the buffers in
-  // readChunkFromContainer().
+  private boolean initialized = false;
+
+  // List of ChunkInputStreams, one for each chunk in the block
+  private List<ChunkInputStream> chunkStreams;
+
+  // chunkOffsets[i] stores the index of the first data byte in
+  // chunkStream i w.r.t the block data.
+  // Let’s say we have chunk size as 40 bytes. And let's say the parent
+  // block stores data from index 200 and has length 400.
+  // The first 40 bytes of this block will be stored in chunk[0], next 40 in
+  // chunk[1] and so on. But since the chunkOffsets are w.r.t the block only
+  // and not the key, the values in chunkOffsets will be [0, 40, 80,....].
+  private long[] chunkOffsets = null;
+
+  // Index of the chunkStream corresponding to the current position of the
+  // BlockInputStream i.e offset of the data to be read next from this block
   private int chunkIndex;
-  // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
-  // buffers or index of the last chunk in the buffers. It is updated only
-  // when a new chunk is read from container into the buffers.
-  private int chunkIndexOfCurrentBuffer;
-  private long[] chunkOffset;
-  private List<ByteBuffer> buffers;
-  private int bufferIndex;
-  private long bufferPosition;
-  private boolean verifyChecksum;
 
-  /**
-   * Creates a new BlockInputStream.
-   *
-   * @param blockID block ID of the chunk
-   * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param chunks list of chunks to read
-   * @param traceID container protocol call traceID
-   * @param verifyChecksum verify checksum
-   * @param initialPosition the initial position of the stream pointer. This
-   *                        position is seeked now if the up-stream was seeked
-   *                        before this was created.
-   */
-  public BlockInputStream(
-      BlockID blockID, XceiverClientManager xceiverClientManager,
-      XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
-      boolean verifyChecksum, long initialPosition) throws IOException {
-    this.blockID = blockID;
-    this.traceID = traceID;
-    this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
-    this.chunks = chunks;
-    this.chunkIndex = 0;
-    this.chunkIndexOfCurrentBuffer = -1;
-    // chunkOffset[i] stores offset at which chunk i stores data in
-    // BlockInputStream
-    this.chunkOffset = new long[this.chunks.size()];
-    initializeChunkOffset();
-    this.buffers = null;
-    this.bufferIndex = 0;
-    this.bufferPosition = -1;
+  // Position of the BlockInputStream is maintainted by this variable till
+  // the stream is initialized. This position is w.r.t to the block only and
+  // not the key.
+  // For the above example, if we seek to position 240 before the stream is
+  // initialized, then value of blockPosition will be set to 40.
+  // Once, the stream is initialized, the position of the stream
+  // will be determined by the current chunkStream and its position.
+  private long blockPosition = 0;
+
+  // Tracks the chunkIndex corresponding to the last blockPosition so that it
+  // can be reset if a new position is seeked.
+  private int chunkIndexOfPrevPosition;
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      String traceId, XceiverClientManager xceiverClientManager) {
+    this.blockID = blockId;
+    this.length = blockLen;
+    this.pipeline = pipeline;
+    this.token = token;
     this.verifyChecksum = verifyChecksum;
-    if (initialPosition > 0) {
-      // The stream was seeked to a position before the stream was
-      // initialized. So seeking to the position now.
-      seek(initialPosition);
-    }
+    this.traceID = traceId;
+    this.xceiverClientManager = xceiverClientManager;
   }
 
-  private void initializeChunkOffset() {
-    long tempOffset = 0;
-    for (int i = 0; i < chunks.size(); i++) {
-      chunkOffset[i] = tempOffset;
-      tempOffset += chunks.get(i).getLen();
+  /**
+   * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
+   * the Container and create the ChunkInputStreams for each Chunk in the Block.
+   */
+  public synchronized void initialize() throws IOException {
+
+    // Pre-check that the stream has not been intialized already
+    if (initialized) {
+      return;
     }
-  }
 
-  @Override
-  public synchronized int read()
-      throws IOException {
-    checkOpen();
-    int available = prepareRead(1);
-    int dataout = EOF;
+    List<ChunkInfo> chunks = getChunkInfos();
+    if (chunks != null && !chunks.isEmpty()) {
+      // For each chunk in the block, create a ChunkInputStream and compute
+      // its chunkOffset
+      this.chunkOffsets = new long[chunks.size()];
+      long tempOffset = 0;
+
+      this.chunkStreams = new ArrayList<>(chunks.size());
+      for (int i = 0; i < chunks.size(); i++) {
+        addStream(chunks.get(i));
+        chunkOffsets[i] = tempOffset;
+        tempOffset += chunks.get(i).getLen();
+      }
 
-    if (available == EOF) {
-      Preconditions
-          .checkState(buffers == null); //should have released by now, see below
-    } else {
-      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
-    }
+      initialized = true;
+      this.chunkIndex = 0;
 
-    if (blockStreamEOF()) {
-      // consumer might use getPos to determine EOF,
-      // so release buffers when serving the last byte of data
-      releaseBuffers();
+      if (blockPosition > 0) {
+        // Stream was seeked to blockPosition before initialization. Seek to the
+        // blockPosition now.
+        seek(blockPosition);
+      }
     }
-
-    return dataout;
   }
 
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    // According to the JavaDocs for InputStream, it is recommended that
-    // subclasses provide an override of bulk read if possible for performance
-    // reasons.  In addition to performance, we need to do it for correctness
-    // reasons.  The Ozone REST service uses PipedInputStream and
-    // PipedOutputStream to relay HTTP response data between a Jersey thread and
-    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
-    // have a subtle dependency (bug?) on the wrapped stream providing separate
-    // implementations of single-byte read and bulk read.  Without this, get key
-    // responses might close the connection before writing all of the bytes
-    // advertised in the Content-Length.
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    checkOpen();
-    int total = 0;
-    while (len > 0) {
-      int available = prepareRead(len);
-      if (available == EOF) {
-        Preconditions
-            .checkState(buffers == null); //should have been released by now
-        return total != 0 ? total : EOF;
-      }
-      buffers.get(bufferIndex).get(b, off + total, available);
-      len -= available;
-      total += available;
+  /**
+   * Send RPC call to get the block info from the container.
+   * @return List of chunks in this block.
+   */
+  protected List<ChunkInfo> getChunkInfos() throws IOException {
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
     }
+    xceiverClient = xceiverClientManager.acquireClient(pipeline);
+    boolean success = false;
+    List<ChunkInfo> chunks;
+    try {
+      LOG.debug("Initializing BlockInputStream for get key to access {}",
+          blockID.getContainerID());
 
-    if (blockStreamEOF()) {
-      // smart consumers determine EOF by calling getPos()
-      // so we release buffers when serving the final bytes of data
-      releaseBuffers();
+      if (token != null) {
+        UserGroupInformation.getCurrentUser().addToken(token);
+      }
+      DatanodeBlockID datanodeBlockID = blockID
+          .getDatanodeBlockIDProtobuf();
+      GetBlockResponseProto response = ContainerProtocolCalls
+          .getBlock(xceiverClient, datanodeBlockID, traceID);
+
+      chunks = response.getBlockData().getChunksList();
+      success = true;
+    } finally {
+      if (!success) {
+        xceiverClientManager.releaseClient(xceiverClient, false);
+      }
     }
 
-    return total;
+    return chunks;
   }
 
   /**
-   * Determines if all data in the stream has been consumed.
-   *
-   * @return true if EOF, false if more data is available
+   * Append another ChunkInputStream to the end of the list. Note that the
+   * ChunkInputStream is only created here. The chunk will be read from the
+   * Datanode only when a read operation is performed on for that chunk.
    */
-  protected boolean blockStreamEOF() {
-    if (buffersHaveData() || chunksRemaining()) {
-      return false;
-    } else {
-      // if there are any chunks, we better be at the last chunk for EOF
-      Preconditions.checkState(((chunks == null) || chunks.isEmpty() ||
-              chunkIndex == (chunks.size() - 1)),
-          "EOF detected, but not at the last chunk");
-      return true;
-    }
-  }
-
-  private void releaseBuffers() {
-    //ashes to ashes, dust to dust
-    buffers = null;
-    bufferIndex = 0;
+  protected synchronized void addStream(ChunkInfo chunkInfo) {
+    chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID,
+        xceiverClient, verifyChecksum));
   }
 
-  @Override
-  public synchronized void close() {
-    if (xceiverClientManager != null && xceiverClient != null) {
-      xceiverClientManager.releaseClient(xceiverClient, false);
-      xceiverClientManager = null;
-      xceiverClient = null;
-    }
+  public synchronized long getRemaining() throws IOException {
+    return length - getPos();
   }
 
   /**
-   * Checks if the stream is open.  If not, throws an exception.
-   *
-   * @throws IOException if stream is closed
+   * {@inheritDoc}
    */
-  private synchronized void checkOpen() throws IOException {
-    if (xceiverClient == null) {
-      throw new IOException("BlockInputStream has been closed.");
+  @Override
+  public synchronized int read() throws IOException {
+    byte[] buf = new byte[1];
+    if (read(buf, 0, 1) == EOF) {
+      return EOF;
     }
+    return Byte.toUnsignedInt(buf[0]);
   }
 
   /**
-   * Prepares to read by advancing through chunks and buffers as needed until it
-   * finds data to return or encounters EOF.
-   *
-   * @param len desired length of data to read
-   * @return length of data available to read, possibly less than desired length
+   * {@inheritDoc}
    */
-  private synchronized int prepareRead(int len) throws IOException {
-    for (;;) {
-      if (!buffersAllocated()) {
-        // The current chunk at chunkIndex has not been read from the
-        // container. Read the chunk and put the data into buffers.
-        readChunkFromContainer();
-      }
-      if (buffersHaveData()) {
-        // Data is available from buffers
-        ByteBuffer bb = buffers.get(bufferIndex);
-        return len > bb.remaining() ? bb.remaining() : len;
-      } else if (chunksRemaining()) {
-        // There are additional chunks available.
-        // Read the next chunk in the block.
-        chunkIndex += 1;
-        readChunkFromContainer();
-      } else {
-        // All available input has been consumed.
-        return EOF;
-      }
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
     }
-  }
-
-  private boolean buffersAllocated() {
-    if (buffers == null || buffers.isEmpty()) {
-      return false;
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
     }
-    return true;
-  }
-
-  private boolean buffersHaveData() {
-    boolean hasData = false;
-
-    if (buffersAllocated()) {
-      while (bufferIndex < (buffers.size())) {
-        if (buffers.get(bufferIndex).hasRemaining()) {
-          // current buffer has data
-          hasData = true;
-          break;
-        } else {
-          if (buffersRemaining()) {
-            // move to next available buffer
-            ++bufferIndex;
-            Preconditions.checkState(bufferIndex < buffers.size());
-          } else {
-            // no more buffers remaining
-            break;
-          }
-        }
-      }
+    if (len == 0) {
+      return 0;
     }
 
-    return hasData;
-  }
+    if (!initialized) {
+      initialize();
+    }
 
-  private boolean buffersRemaining() {
-    return (bufferIndex < (buffers.size() - 1));
-  }
+    checkOpen();
+    int totalReadLen = 0;
+    while (len > 0) {
+      // if we are at the last chunk and have read the entire chunk, return
+      if (chunkStreams.size() == 0 ||
+          (chunkStreams.size() - 1 <= chunkIndex &&
+              chunkStreams.get(chunkIndex)
+                  .getRemaining() == 0)) {
+        return totalReadLen == 0 ? EOF : totalReadLen;
+      }
 
-  private boolean chunksRemaining() {
-    if ((chunks == null) || chunks.isEmpty()) {
-      return false;
-    }
-    // Check if more chunks are remaining in the stream after chunkIndex
-    if (chunkIndex < (chunks.size() - 1)) {
-      return true;
+      // Get the current chunkStream and read data from it
+      ChunkInputStream current = chunkStreams.get(chunkIndex);
+      int numBytesToRead = Math.min(len, (int)current.getRemaining());
+      int numBytesRead = current.read(b, off, numBytesToRead);
+      if (numBytesRead != numBytesToRead) {
+        // This implies that there is either data loss or corruption in the
+        // chunk entries. Even EOF in the current stream would be covered in
+        // this case.
+        throw new IOException(String.format(
+            "Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
+            current.getChunkName(), current.getLength(), numBytesRead));
+      }
+      totalReadLen += numBytesRead;
+      off += numBytesRead;
+      len -= numBytesRead;
+      if (current.getRemaining() <= 0 &&
+          ((chunkIndex + 1) < chunkStreams.size())) {
+        chunkIndex += 1;
+      }
     }
-    // ChunkIndex is the last chunk in the stream. Check if this chunk has
-    // been read from container or not. Return true if chunkIndex has not
-    // been read yet and false otherwise.
-    return chunkIndexOfCurrentBuffer != chunkIndex;
+    return totalReadLen;
   }
 
   /**
-   * Attempts to read the chunk at the specified offset in the chunk list.  If
-   * successful, then the data of the read chunk is saved so that its bytes can
-   * be returned from subsequent read calls.
+   * Seeks the BlockInputStream to the specified position. If the stream is
+   * not initialized, save the seeked position via blockPosition. Otherwise,
+   * update the position in 2 steps:
+   *    1. Updating the chunkIndex to the chunkStream corresponding to the
+   *    seeked position.
+   *    2. Seek the corresponding chunkStream to the adjusted position.
    *
-   * @throws IOException if there is an I/O error while performing the call
+   * Let’s say we have chunk size as 40 bytes. And let's say the parent block
+   * stores data from index 200 and has length 400. If the key was seeked to
+   * position 90, then this block will be seeked to position 90.
+   * When seek(90) is called on this blockStream, then
+   *    1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chunk[2]).
+   *    2. chunkStream[2] will be seeked to position 10
+   *       (= 90 - chunkOffset[2] (= 80)).
    */
-  private synchronized void readChunkFromContainer() throws IOException {
-    // Read the chunk at chunkIndex
-    final ChunkInfo chunkInfo = chunks.get(chunkIndex);
-    ByteString byteString;
-    byteString = readChunk(chunkInfo);
-    buffers = byteString.asReadOnlyByteBufferList();
-    bufferIndex = 0;
-    chunkIndexOfCurrentBuffer = chunkIndex;
-
-    // The bufferIndex and position might need to be adjusted if seek() was
-    // called on the stream before. This needs to be done so that the buffer
-    // position can be advanced to the 'seeked' position.
-    adjustBufferIndex();
-  }
-
-  /**
-   * Send RPC call to get the chunk from the container.
-   */
-  @VisibleForTesting
-  protected ByteString readChunk(final ChunkInfo chunkInfo)
-      throws IOException {
-    ReadChunkResponseProto readChunkResponse;
-    try {
-      List<CheckedBiFunction> validators =
-          ContainerProtocolCalls.getValidatorList();
-      validators.add(validator);
-      readChunkResponse = ContainerProtocolCalls
-          .readChunk(xceiverClient, chunkInfo, blockID, traceID, validators);
-    } catch (IOException e) {
-      if (e instanceof StorageContainerException) {
-        throw e;
-      }
-      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
-    }
-    return readChunkResponse.getData();
-  }
-
-  @VisibleForTesting
-  protected List<DatanodeDetails> getDatanodeList() {
-    return xceiverClient.getPipeline().getNodes();
-  }
-
-  private CheckedBiFunction<ContainerCommandRequestProto,
-      ContainerCommandResponseProto, IOException> validator =
-          (request, response) -> {
-            ReadChunkResponseProto readChunkResponse = response.getReadChunk();
-            final ChunkInfo chunkInfo = readChunkResponse.getChunkData();
-            ByteString byteString = readChunkResponse.getData();
-            if (byteString.size() != chunkInfo.getLen()) {
-              // Bytes read from chunk should be equal to chunk size.
-              throw new OzoneChecksumException(String
-                  .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
-                      chunkInfo.getChunkName(), chunkInfo.getLen(),
-                      byteString.size()));
-            }
-            ChecksumData checksumData =
-                ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
-            if (verifyChecksum) {
-              Checksum.verifyChecksum(byteString, checksumData);
-            }
-          };
-
   @Override
   public synchronized void seek(long pos) throws IOException {
-    if (pos < 0 || (chunks.size() == 0 && pos > 0)
-        || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
-        .getLen()) {
-      throw new EOFException("EOF encountered pos: " + pos + " container key: "
-          + blockID.getLocalID());
+    if (!initialized) {
+      // Stream has not been initialized yet. Save the position so that it
+      // can be seeked when the stream is initialized.
+      blockPosition = pos;
+      return;
+    }
+
+    checkOpen();
+    if (pos < 0 || pos >= length) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException(
+          "EOF encountered at pos: " + pos + " for block: " + blockID);
     }
 
-    if (pos < chunkOffset[chunkIndex]) {
-      chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
-    } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
-        .getLen()) {
+    if (chunkIndex >= chunkStreams.size()) {
+      chunkIndex = Arrays.binarySearch(chunkOffsets, pos);
+    } else if (pos < chunkOffsets[chunkIndex]) {
       chunkIndex =
-          Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
+          Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos);
+    } else if (pos >= chunkOffsets[chunkIndex] + chunkStreams
+        .get(chunkIndex).getLength()) {
+      chunkIndex = Arrays.binarySearch(chunkOffsets,
+          chunkIndex + 1, chunkStreams.size(), pos);
     }
     if (chunkIndex < 0) {
       // Binary search returns -insertionPoint - 1  if element is not present
       // in the array. insertionPoint is the point at which element would be
       // inserted in the sorted array. We need to adjust the chunkIndex
       // accordingly so that chunkIndex = insertionPoint - 1
-      chunkIndex = -chunkIndex -2;
+      chunkIndex = -chunkIndex - 2;
     }
 
-    // The bufferPosition should be adjusted to account for the chunk offset
-    // of the chunk the the pos actually points to.
-    bufferPosition = pos - chunkOffset[chunkIndex];
+    // Reset the previous chunkStream's position
+    chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();
 
-    // Check if current buffers correspond to the chunk index being seeked
-    // and if the buffers have any data.
-    if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
-      // Position the buffer to the seeked position.
-      adjustBufferIndex();
-    } else {
-      // Release the current buffers. The next readChunkFromContainer will
-      // read the required chunk and position the buffer to the seeked
-      // position.
-      releaseBuffers();
-    }
+    // seek to the proper offset in the ChunkInputStream
+    chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
+    chunkIndexOfPrevPosition = chunkIndex;
   }
 
-  private void adjustBufferIndex() {
-    if (bufferPosition == -1) {
-      // The stream has not been seeked to a position. No need to adjust the
-      // buffer Index and position.
-      return;
+  @Override
+  public synchronized long getPos() throws IOException {
+    if (length == 0) {
+      return 0;
     }
-    // The bufferPosition is w.r.t the buffers for current chunk.
-    // Adjust the bufferIndex and position to the seeked position.
-    long tempOffest = 0;
-    for (int i = 0; i < buffers.size(); i++) {
-      if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
-        tempOffest += buffers.get(i).capacity();
-      } else {
-        bufferIndex = i;
-        break;
-      }
+
+    if (!initialized) {
+      // The stream is not initialized yet. Return the blockPosition
+      return blockPosition;
+    } else {
+      return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPos();
     }
-    buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
-    // Reset the bufferPosition as the seek() operation has been completed.
-    bufferPosition = -1;
   }
 
   @Override
-  public synchronized long getPos() throws IOException {
-    // position = chunkOffset of current chunk (at chunkIndex) + position of
-    // the buffer corresponding to the chunk.
-    long bufferPos = 0;
-
-    if (bufferPosition >= 0) {
-      // seek has been called but the buffers were empty. Hence, the buffer
-      // position will be advanced after the buffers are filled.
-      // We return the chunkOffset + bufferPosition here as that will be the
-      // position of the buffer pointer after reading the chunk file.
-      bufferPos = bufferPosition;
-
-    } else if (blockStreamEOF()) {
-      // all data consumed, buffers have been released.
-      // get position from the chunk offset and chunk length of last chunk
-      bufferPos = chunks.get(chunkIndex).getLen();
-
-    } else if (buffersAllocated()) {
-      // get position from available buffers of current chunk
-      bufferPos = buffers.get(bufferIndex).position();
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
 
+  @Override
+  public synchronized void close() {
+    if (xceiverClientManager != null && xceiverClient != null) {
+      xceiverClientManager.releaseClient(xceiverClient, false);
+      xceiverClientManager = null;
+      xceiverClient = null;
     }
+  }
 
-    return chunkOffset[chunkIndex] + bufferPos;
+  public synchronized void resetPosition() {
+    this.blockPosition = 0;
   }
 
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return false;
+  /**
+   * Checks if the stream is open.  If not, throw an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  protected synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("BlockInputStream has been closed.");
+    }
   }
 
   public BlockID getBlockID() {
     return blockID;
   }
 
+  public long getLength() {
+    return length;
+  }
+
   @VisibleForTesting
-  protected int getChunkIndex() {
+  synchronized int getChunkIndex() {
     return chunkIndex;
   }
+
+  @VisibleForTesting
+  synchronized long getBlockPosition() {
+    return blockPosition;
+  }
+
+  @VisibleForTesting
+  synchronized List<ChunkInputStream> getChunkStreams() {
+    return chunkStreams;
+  }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
new file mode 100644
index 0000000..8d30c22
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * An {@link InputStream} called from BlockInputStream to read a chunk from the
+ * container. Each chunk may contain multiple underlying {@link ByteBuffer}
+ * instances.
+ */
+public class ChunkInputStream extends InputStream implements Seekable {
+
+  private ChunkInfo chunkInfo;
+  private final long length;
+  private final BlockID blockID;
+  private final String traceID;
+  private XceiverClientSpi xceiverClient;
+  private boolean verifyChecksum;
+  private boolean allocated = false;
+
+  // Buffer to store the chunk data read from the DN container
+  private List<ByteBuffer> buffers;
+
+  // Index of the buffers corresponding to the current position of the buffers
+  private int bufferIndex;
+
+  // The offset of the current data residing in the buffers w.r.t the start
+  // of chunk data
+  private long bufferOffset;
+
+  // The number of bytes of chunk data residing in the buffers currently
+  private long bufferLength;
+
+  // Position of the ChunkInputStream is maintained by this variable (if a
+  // seek is performed. This position is w.r.t to the chunk only and not the
+  // block or key. This variable is set only if either the buffers are not
+  // yet allocated or the if the allocated buffers do not cover the seeked
+  // position. Once the chunk is read, this variable is reset.
+  private long chunkPosition = -1;
+
+  private static final int EOF = -1;
+
+  ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
+      String traceId, XceiverClientSpi xceiverClient, boolean verifyChecksum) {
+    this.chunkInfo = chunkInfo;
+    this.length = chunkInfo.getLen();
+    this.blockID = blockId;
+    this.traceID = traceId;
+    this.xceiverClient = xceiverClient;
+    this.verifyChecksum = verifyChecksum;
+  }
+
+  public synchronized long getRemaining() throws IOException {
+    return length - getPos();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized int read() throws IOException {
+    checkOpen();
+    int available = prepareRead(1);
+    int dataout = EOF;
+
+    if (available == EOF) {
+      // There is no more data in the chunk stream. The buffers should have
+      // been released by now
+      Preconditions.checkState(buffers == null);
+    } else {
+      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+    }
+
+    if (chunkStreamEOF()) {
+      // consumer might use getPos to determine EOF,
+      // so release buffers when serving the last byte of data
+      releaseBuffers();
+    }
+
+    return dataout;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    checkOpen();
+    int total = 0;
+    while (len > 0) {
+      int available = prepareRead(len);
+      if (available == EOF) {
+        // There is no more data in the chunk stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      buffers.get(bufferIndex).get(b, off + total, available);
+      len -= available;
+      total += available;
+    }
+
+    if (chunkStreamEOF()) {
+      // smart consumers determine EOF by calling getPos()
+      // so we release buffers when serving the final bytes of data
+      releaseBuffers();
+    }
+
+    return total;
+  }
+
+  /**
+   * Seeks the ChunkInputStream to the specified position. This is done by
+   * updating the chunkPosition to the seeked position in case the buffers
+   * are not allocated or buffers do not contain the data corresponding to
+   * the seeked position (determined by buffersHavePosition()). Otherwise,
+   * the buffers position is updated to the seeked position.
+   */
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (pos < 0 || pos >= length) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException("EOF encountered at pos: " + pos + " for chunk: "
+          + chunkInfo.getChunkName());
+    }
+
+    if (buffersHavePosition(pos)) {
+      // The bufferPosition is w.r.t the current chunk.
+      // Adjust the bufferIndex and position to the seeked position.
+      adjustBufferPosition(pos - bufferOffset);
+    } else {
+      chunkPosition = pos;
+    }
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    if (chunkPosition >= 0) {
+      return chunkPosition;
+    }
+    if (chunkStreamEOF()) {
+      return length;
+    }
+    if (buffersHaveData()) {
+      return bufferOffset + buffers.get(bufferIndex).position();
+    }
+    if (buffersAllocated()) {
+      return bufferOffset + bufferLength;
+    }
+    return 0;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (xceiverClient != null) {
+      xceiverClient = null;
+    }
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throw an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  protected synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("BlockInputStream has been closed.");
+    }
+  }
+
+  /**
+   * Prepares to read by advancing through buffers or allocating new buffers,
+   * as needed until it finds data to return, or encounters EOF.
+   * @param len desired lenght of data to read
+   * @return length of data available to read, possibly less than desired length
+   */
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (chunkPosition >= 0) {
+        if (buffersHavePosition(chunkPosition)) {
+          // The current buffers have the seeked position. Adjust the buffer
+          // index and position to point to the chunkPosition.
+          adjustBufferPosition(chunkPosition - bufferOffset);
+        } else {
+          // Read a required chunk data to fill the buffers with seeked
+          // position data
+          readChunkFromContainer(len);
+        }
+      }
+      if (buffersHaveData()) {
+        // Data is available from buffers
+        ByteBuffer bb = buffers.get(bufferIndex);
+        return len > bb.remaining() ? bb.remaining() : len;
+      } else  if (dataRemainingInChunk()) {
+        // There is more data in the chunk stream which has not
+        // been read into the buffers yet.
+        readChunkFromContainer(len);
+      } else {
+        // All available input from this chunk stream has been consumed.
+        return EOF;
+      }
+    }
+  }
+
+  /**
+   * Reads full or partial Chunk from DN Container based on the current
+   * position of the ChunkInputStream, the number of bytes of data to read
+   * and the checksum boundaries.
+   * If successful, then the read data in saved in the buffers so that
+   * subsequent read calls can utilize it.
+   * @param len number of bytes of data to be read
+   * @throws IOException if there is an I/O error while performing the call
+   * to Datanode
+   */
+  private synchronized void readChunkFromContainer(int len) throws IOException {
+
+    // index of first byte to be read from the chunk
+    long startByteIndex;
+    if (chunkPosition >= 0) {
+      // If seek operation was called to advance the buffer position, the
+      // chunk should be read from that position onwards.
+      startByteIndex = chunkPosition;
+    } else {
+      // Start reading the chunk from the last chunkPosition onwards.
+      startByteIndex = bufferOffset + bufferLength;
+    }
+
+    if (verifyChecksum) {
+      // Update the bufferOffset and bufferLength as per the checksum
+      // boundary requirement.
+      computeChecksumBoundaries(startByteIndex, len);
+    } else {
+      // Read from the startByteIndex
+      bufferOffset = startByteIndex;
+      bufferLength = len;
+    }
+
+    // Adjust the chunkInfo so that only the required bytes are read from
+    // the chunk.
+    final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo)
+        .setOffset(bufferOffset)
+        .setLen(bufferLength)
+        .build();
+
+    ByteString byteString = readChunk(adjustedChunkInfo);
+
+    buffers = byteString.asReadOnlyByteBufferList();
+    bufferIndex = 0;
+    allocated = true;
+
+    // If the stream was seeked to position before, then the buffer
+    // position should be adjusted as the reads happen at checksum boundaries.
+    // The buffers position might need to be adjusted for the following
+    // scenarios:
+    //    1. Stream was seeked to a position before the chunk was read
+    //    2. Chunk was read from index < the current position to account for
+    //    checksum boundaries.
+    adjustBufferPosition(startByteIndex - bufferOffset);
+  }
+
+  /**
+   * Send RPC call to get the chunk from the container.
+   */
+  @VisibleForTesting
+  protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
+    ReadChunkResponseProto readChunkResponse;
+
+    try {
+      List<CheckedBiFunction> validators =
+          ContainerProtocolCalls.getValidatorList();
+      validators.add(validator);
+
+      readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
+          readChunkInfo, blockID, traceID, validators);
+
+    } catch (IOException e) {
+      if (e instanceof StorageContainerException) {
+        throw e;
+      }
+      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
+    }
+
+    return readChunkResponse.getData();
+  }
+
+  private CheckedBiFunction<ContainerCommandRequestProto,
+      ContainerCommandResponseProto, IOException> validator =
+          (request, response) -> {
+            final ChunkInfo reqChunkInfo =
+                request.getReadChunk().getChunkData();
+
+            ReadChunkResponseProto readChunkResponse = response.getReadChunk();
+            ByteString byteString = readChunkResponse.getData();
+
+            if (byteString.size() != reqChunkInfo.getLen()) {
+              // Bytes read from chunk should be equal to chunk size.
+              throw new OzoneChecksumException(String
+                  .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+                      reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
+                      byteString.size()));
+            }
+
+            if (verifyChecksum) {
+              ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+                  chunkInfo.getChecksumData());
+
+              // ChecksumData stores checksum for each 'numBytesPerChecksum'
+              // number of bytes in a list. Compute the index of the first
+              // checksum to match with the read data
+
+              int checkumStartIndex = (int) (reqChunkInfo.getOffset() /
+                  checksumData.getBytesPerChecksum());
+              Checksum.verifyChecksum(
+                  byteString, checksumData, checkumStartIndex);
+            }
+          };
+
+  /**
+   * Return the offset and length of bytes that need to be read from the
+   * chunk file to cover the checksum boundaries covering the actual start and
+   * end of the chunk index to be read.
+   * For example, lets say the client is reading from index 120 to 450 in the
+   * chunk. And let's say checksum is stored for every 100 bytes in the chunk
+   * i.e. the first checksum is for bytes from index 0 to 99, the next for
+   * bytes from index 100 to 199 and so on. To verify bytes from 120 to 450,
+   * we would need to read from bytes 100 to 499 so that checksum
+   * verification can be done.
+   *
+   * @param startByteIndex the first byte index to be read by client
+   * @param dataLen number of bytes to be read from the chunk
+   */
+  private void computeChecksumBoundaries(long startByteIndex, int dataLen) {
+
+    int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
+    // index of the last byte to be read from chunk, inclusively.
+    final long endByteIndex = startByteIndex + dataLen - 1;
+
+    bufferOffset =  (startByteIndex / bytesPerChecksum)
+        * bytesPerChecksum; // inclusive
+    final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
+        * bytesPerChecksum; // exclusive
+    bufferLength = Math.min(endIndex, length) - bufferOffset;
+  }
+
+  /**
+   * Adjust the buffers position to account for seeked position and/ or checksum
+   * boundary reads.
+   * @param bufferPosition the position to which the buffers must be advanced
+   */
+  private void adjustBufferPosition(long bufferPosition) {
+    // The bufferPosition is w.r.t the current chunk.
+    // Adjust the bufferIndex and position to the seeked chunkPosition.
+    long tempOffest = 0;
+    for (int i = 0; i < buffers.size(); i++) {
+      if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
+        tempOffest += buffers.get(i).capacity();
+      } else {
+        bufferIndex = i;
+        break;
+      }
+    }
+    buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
+
+    // Reset the chunkPosition as chunk stream has been initialized i.e. the
+    // buffers have been allocated.
+    resetPosition();
+  }
+
+  /**
+   * Check if the buffers have been allocated data and false otherwise.
+   */
+  private boolean buffersAllocated() {
+    return buffers != null && !buffers.isEmpty();
+  }
+
+  /**
+   * Check if the buffers have any data remaining between the current
+   * position and the limit.
+   */
+  private boolean buffersHaveData() {
+    boolean hasData = false;
+
+    if (buffersAllocated()) {
+      while (bufferIndex < (buffers.size())) {
+        if (buffers.get(bufferIndex).hasRemaining()) {
+          // current buffer has data
+          hasData = true;
+          break;
+        } else {
+          if (buffersRemaining()) {
+            // move to next available buffer
+            ++bufferIndex;
+            Preconditions.checkState(bufferIndex < buffers.size());
+          } else {
+            // no more buffers remaining
+            break;
+          }
+        }
+      }
+    }
+
+    return hasData;
+  }
+
+  private boolean buffersRemaining() {
+    return (bufferIndex < (buffers.size() - 1));
+  }
+
+  /**
+   * Check if curernt buffers have the data corresponding to the input position.
+   */
+  private boolean buffersHavePosition(long pos) {
+    // Check if buffers have been allocated
+    if (buffersAllocated()) {
+      // Check if the current buffers cover the input position
+      return pos >= bufferOffset &&
+          pos < bufferOffset + bufferLength;
+    }
+    return false;
+  }
+
+  /**
+   * Check if there is more data in the chunk which has not yet been read
+   * into the buffers.
+   */
+  private boolean dataRemainingInChunk() {
+    long bufferPos;
+    if (chunkPosition >= 0) {
+      bufferPos = chunkPosition;
+    } else {
+      bufferPos = bufferOffset + bufferLength;
+    }
+
+    return bufferPos < length;
+  }
+
+  /**
+   * Check if end of chunkStream has been reached.
+   */
+  private boolean chunkStreamEOF() {
+    if (!allocated) {
+      // Chunk data has not been read yet
+      return false;
+    }
+
+    if (buffersHaveData() || dataRemainingInChunk()) {
+      return false;
+    } else {
+      Preconditions.checkState(bufferOffset + bufferLength == length,
+          "EOF detected, but not at the last byte of the chunk");
+      return true;
+    }
+  }
+
+  /**
+   * If EOF is reached, release the buffers.
+   */
+  private void releaseBuffers() {
+    buffers = null;
+    bufferIndex = 0;
+  }
+
+  /**
+   * Reset the chunkPosition once the buffers are allocated.
+   */
+  void resetPosition() {
+    this.chunkPosition = -1;
+  }
+
+  String getChunkName() {
+    return chunkInfo.getChunkName();
+  }
+
+  protected long getLength() {
+    return length;
+  }
+
+  @VisibleForTesting
+  protected long getChunkPosition() {
+    return chunkPosition;
+  }
+}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index b6ceb2b..a1985f0 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -1,32 +1,33 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
+
 package org.apache.hadoop.hdds.scm.storage;
 
+import com.google.common.primitives.Bytes;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ChecksumData;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ChecksumType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,106 +35,127 @@ import org.junit.Test;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
-import java.util.UUID;
+
+import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
 
 /**
- * Tests {@link BlockInputStream}.
+ * Tests for {@link BlockInputStream}'s functionality.
  */
 public class TestBlockInputStream {
 
-  private static BlockInputStream blockInputStream;
-  private static List<ChunkInfo> chunks;
-  private static int blockSize;
+  private static final int CHUNK_SIZE = 100;
+  private static Checksum checksum;
 
-  private static final int CHUNK_SIZE = 20;
+  private BlockInputStream blockStream;
+  private byte[] blockData;
+  private int blockSize;
+  private List<ChunkInfo> chunks;
+  private Map<String, byte[]> chunkDataMap;
 
   @Before
   public void setup() throws Exception {
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
-    chunks = createChunkList(10);
-    String traceID = UUID.randomUUID().toString();
-    blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks,
-        traceID, false, 0);
-
-    blockSize = 0;
-    for (ChunkInfo chunk : chunks) {
-      blockSize += chunk.getLen();
-    }
+    checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
+    createChunkList(5);
+
+    blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
+        false, null, null);
   }
 
   /**
    * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
    * and the last chunk with length CHUNK_SIZE/2.
-   * @param numChunks
-   * @return
    */
-  private static List<ChunkInfo> createChunkList(int numChunks) {
-    ChecksumData dummyChecksumData = ChecksumData.newBuilder()
-        .setType(ChecksumType.NONE)
-        .setBytesPerChecksum(100)
-        .build();
-    List<ChunkInfo> chunkList = new ArrayList<>(numChunks);
-    int i;
-    for (i = 0; i < numChunks - 1; i++) {
-      String chunkName = "chunk-" + i;
+  private void createChunkList(int numChunks)
+      throws Exception {
+
+    chunks = new ArrayList<>(numChunks);
+    chunkDataMap = new HashMap<>();
+    blockData = new byte[0];
+    int i, chunkLen;
+    byte[] byteData;
+    String chunkName;
+
+    for (i = 0; i < numChunks; i++) {
+      chunkName = "chunk-" + i;
+      chunkLen = CHUNK_SIZE;
+      if (i == numChunks - 1) {
+        chunkLen = CHUNK_SIZE / 2;
+      }
+      byteData = generateRandomData(chunkLen);
       ChunkInfo chunkInfo = ChunkInfo.newBuilder()
           .setChunkName(chunkName)
           .setOffset(0)
-          .setLen(CHUNK_SIZE)
-          .setChecksumData(dummyChecksumData)
+          .setLen(chunkLen)
+          .setChecksumData(checksum.computeChecksum(
+              byteData, 0, chunkLen).getProtoBufMessage())
           .build();
-      chunkList.add(chunkInfo);
+
+      chunkDataMap.put(chunkName, byteData);
+      chunks.add(chunkInfo);
+
+      blockSize += chunkLen;
+      blockData = Bytes.concat(blockData, byteData);
     }
-    ChunkInfo chunkInfo = ChunkInfo.newBuilder()
-        .setChunkName("chunk-" + i)
-        .setOffset(0)
-        .setLen(CHUNK_SIZE/2)
-        .setChecksumData(dummyChecksumData)
-        .build();
-    chunkList.add(chunkInfo);
-
-    return chunkList;
   }
 
   /**
-   * A dummy BlockInputStream to test the functionality of BlockInputStream.
+   * A dummy BlockInputStream to mock read block call to DN.
    */
-  private static class DummyBlockInputStream extends BlockInputStream {
+  private class DummyBlockInputStream extends BlockInputStream {
 
-    DummyBlockInputStream(BlockID blockID,
-        XceiverClientManager xceiverClientManager,
-        XceiverClientSpi xceiverClient,
-        List<ChunkInfo> chunks,
-        String traceID,
+    DummyBlockInputStream(BlockID blockId,
+        long blockLen,
+        Pipeline pipeline,
+        Token<OzoneBlockTokenIdentifier> token,
         boolean verifyChecksum,
-        long initialPosition) throws IOException {
-      super(blockID, xceiverClientManager, xceiverClient, chunks, traceID,
-          verifyChecksum, initialPosition);
+        String traceId,
+        XceiverClientManager xceiverClientManager) {
+      super(blockId, blockLen, pipeline, token, verifyChecksum,
+          traceId, xceiverClientManager);
     }
 
     @Override
-    protected ByteString readChunk(final ChunkInfo chunkInfo)
-        throws IOException {
-      return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
+    protected List<ChunkInfo> getChunkInfos() {
+      return chunks;
     }
 
     @Override
-    protected List<DatanodeDetails> getDatanodeList() {
-      // return an empty dummy list of size 10
-      return new ArrayList<>(10);
+    protected void addStream(ChunkInfo chunkInfo) {
+      TestChunkInputStream testChunkInputStream = new TestChunkInputStream();
+      getChunkStreams().add(testChunkInputStream.new DummyChunkInputStream(
+          chunkInfo, null, null, null, false,
+          chunkDataMap.get(chunkInfo.getChunkName()).clone()));
     }
 
-    /**
-     * Create ByteString with the input data to return when a readChunk call is
-     * placed.
-     */
-    private static ByteString getByteString(String data, int length) {
-      while (data.length() < length) {
-        data = data + "0";
-      }
-      return ByteString.copyFrom(data.getBytes(), 0, length);
+    @Override
+    protected synchronized void checkOpen() throws IOException {
+      // No action needed
+    }
+  }
+
+  private void seekAndVerify(int pos) throws Exception {
+    blockStream.seek(pos);
+    Assert.assertEquals("Current position of buffer does not match with the " +
+        "seeked position", pos, blockStream.getPos());
+  }
+
+  /**
+   * Match readData with the chunkData byte-wise.
+   * @param readData Data read through ChunkInputStream
+   * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+   *                            with read data
+   * @param length the number of bytes of data to match starting from
+   *               inputDataStartIndex
+   */
+  private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+      int length) {
+    for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
+      Assert.assertEquals(blockData[i], readData[i - inputDataStartIndex]);
     }
   }
 
@@ -143,17 +165,26 @@ public class TestBlockInputStream {
     int pos = 0;
     seekAndVerify(pos);
     Assert.assertEquals("ChunkIndex is incorrect", 0,
-        blockInputStream.getChunkIndex());
+        blockStream.getChunkIndex());
 
+    // Before BlockInputStream is initialized (initialization happens during
+    // read operation), seek should update the BlockInputStream#blockPosition
     pos = CHUNK_SIZE;
     seekAndVerify(pos);
+    Assert.assertEquals("ChunkIndex is incorrect", 0,
+        blockStream.getChunkIndex());
+    Assert.assertEquals(pos, blockStream.getBlockPosition());
+
+    // Initialize the BlockInputStream. After initializtion, the chunkIndex
+    // should be updated to correspond to the seeked position.
+    blockStream.initialize();
     Assert.assertEquals("ChunkIndex is incorrect", 1,
-        blockInputStream.getChunkIndex());
+        blockStream.getChunkIndex());
 
-    pos = (CHUNK_SIZE * 5) + 5;
+    pos = (CHUNK_SIZE * 4) + 5;
     seekAndVerify(pos);
-    Assert.assertEquals("ChunkIndex is incorrect", 5,
-        blockInputStream.getChunkIndex());
+    Assert.assertEquals("ChunkIndex is incorrect", 4,
+        blockStream.getChunkIndex());
 
     try {
       // Try seeking beyond the blockSize.
@@ -161,7 +192,7 @@ public class TestBlockInputStream {
       seekAndVerify(pos);
       Assert.fail("Seek to position beyond block size should fail.");
     } catch (EOFException e) {
-      // Expected
+      System.out.println(e);
     }
 
     // Seek to random positions between 0 and the block size.
@@ -173,20 +204,32 @@ public class TestBlockInputStream {
   }
 
   @Test
-  public void testBlockEOF() throws Exception {
-    // Seek to some position < blockSize and verify EOF is not reached.
-    seekAndVerify(CHUNK_SIZE);
-    Assert.assertFalse(blockInputStream.blockStreamEOF());
-
-    // Seek to blockSize-1 and verify that EOF is not reached as the chunk
-    // has not been read from container yet.
-    seekAndVerify(blockSize-1);
-    Assert.assertFalse(blockInputStream.blockStreamEOF());
+  public void testRead() throws Exception {
+    // read 200 bytes of data starting from position 50. Chunk0 contains
+    // indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
+    // the read should result in 3 ChunkInputStream reads
+    seekAndVerify(50);
+    byte[] b = new byte[200];
+    blockStream.read(b, 0, 200);
+    matchWithInputData(b, 50, 200);
+
+    // The new position of the blockInputStream should be the last index read
+    // + 1.
+    Assert.assertEquals(250, blockStream.getPos());
+    Assert.assertEquals(2, blockStream.getChunkIndex());
   }
 
-  private void seekAndVerify(int pos) throws Exception {
-    blockInputStream.seek(pos);
-    Assert.assertEquals("Current position of buffer does not match with the " +
-            "seeked position", pos, blockInputStream.getPos());
+  @Test
+  public void testSeekAndRead() throws Exception {
+    // Seek to a position and read data
+    seekAndVerify(50);
+    byte[] b1 = new byte[100];
+    blockStream.read(b1, 0, 100);
+    matchWithInputData(b1, 50, 100);
+
+    // Next read should start from the position of the last read + 1 i.e. 100
+    byte[] b2 = new byte[100];
+    blockStream.read(b2, 0, 100);
+    matchWithInputData(b2, 150, 100);
   }
 }
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
new file mode 100644
index 0000000..b113bc7
--- /dev/null
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Tests for {@link ChunkInputStream}'s functionality.
+ */
+public class TestChunkInputStream {
+
+  private static final int CHUNK_SIZE = 100;
+  private static final int BYTES_PER_CHECKSUM = 20;
+  private static final String CHUNK_NAME = "dummyChunk";
+  private static final Random RANDOM = new Random();
+  private static Checksum checksum;
+
+  private DummyChunkInputStream chunkStream;
+  private ChunkInfo chunkInfo;
+  private byte[] chunkData;
+
+  @Before
+  public void setup() throws Exception {
+    checksum = new Checksum(ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT),
+        BYTES_PER_CHECKSUM);
+
+    chunkData = generateRandomData(CHUNK_SIZE);
+
+    chunkInfo = ChunkInfo.newBuilder()
+        .setChunkName(CHUNK_NAME)
+        .setOffset(0)
+        .setLen(CHUNK_SIZE)
+        .setChecksumData(checksum.computeChecksum(
+            chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
+        .build();
+
+    chunkStream = new DummyChunkInputStream(chunkInfo, null, null, null, true);
+  }
+
+  static byte[] generateRandomData(int length) {
+    byte[] bytes = new byte[length];
+    RANDOM.nextBytes(bytes);
+    return bytes;
+  }
+
+  /**
+   * A dummy ChunkInputStream to mock read chunk calls to DN.
+   */
+  public class DummyChunkInputStream extends ChunkInputStream {
+
+    // Stores the read chunk data in each readChunk call
+    private List<ByteString> readByteBuffers = new ArrayList<>();
+
+    DummyChunkInputStream(ChunkInfo chunkInfo,
+        BlockID blockId,
+        String traceId,
+        XceiverClientSpi xceiverClient,
+        boolean verifyChecksum) {
+      super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
+    }
+
+    public DummyChunkInputStream(ChunkInfo chunkInfo,
+        BlockID blockId,
+        String traceId,
+        XceiverClientSpi xceiverClient,
+        boolean verifyChecksum,
+        byte[] data) {
+      super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
+      chunkData = data;
+    }
+
+    @Override
+    protected ByteString readChunk(ChunkInfo readChunkInfo) {
+      ByteString byteString = ByteString.copyFrom(chunkData,
+          (int) readChunkInfo.getOffset(),
+          (int) readChunkInfo.getLen());
+      readByteBuffers.add(byteString);
+      return byteString;
+    }
+
+    @Override
+    protected void checkOpen() {
+      // No action needed
+    }
+  }
+
+  /**
+   * Match readData with the chunkData byte-wise.
+   * @param readData Data read through ChunkInputStream
+   * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+   *                            with read data
+   * @param length the number of bytes of data to match starting from
+   *               inputDataStartIndex
+   */
+  private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+      int length) {
+    for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
+      Assert.assertEquals(chunkData[i], readData[i - inputDataStartIndex]);
+    }
+  }
+
+  /**
+   * Seek to a position and verify through getPos().
+   */
+  private void seekAndVerify(int pos) throws Exception {
+    chunkStream.seek(pos);
+    Assert.assertEquals("Current position of buffer does not match with the " +
+        "seeked position", pos, chunkStream.getPos());
+  }
+
+  @Test
+  public void testFullChunkRead() throws Exception {
+    byte[] b = new byte[CHUNK_SIZE];
+    chunkStream.read(b, 0, CHUNK_SIZE);
+
+    matchWithInputData(b, 0, CHUNK_SIZE);
+  }
+
+  @Test
+  public void testPartialChunkRead() throws Exception {
+    int len = CHUNK_SIZE / 2;
+    byte[] b = new byte[len];
+
+    chunkStream.read(b, 0, len);
+
+    matchWithInputData(b, 0, len);
+
+    // To read chunk data from index 0 to 49 (len = 50), we need to read
+    // chunk from offset 0 to 60 as the checksum boundary is at every 20
+    // bytes. Verify that 60 bytes of chunk data are read and stored in the
+    // buffers.
+    matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
+        0, 60);
+
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    seekAndVerify(0);
+
+    try {
+      seekAndVerify(CHUNK_SIZE);
+      Assert.fail("Seeking to Chunk Length should fail.");
+    } catch (EOFException e) {
+      GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
+          + CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
+    }
+
+    // Seek before read should update the ChunkInputStream#chunkPosition
+    seekAndVerify(25);
+    Assert.assertEquals(25, chunkStream.getChunkPosition());
+
+    // Read from the seeked position.
+    // Reading from index 25 to 54 should result in the ChunkInputStream
+    // copying chunk data from index 20 to 59 into the buffers (checksum
+    // boundaries).
+    byte[] b = new byte[30];
+    chunkStream.read(b, 0, 30);
+    matchWithInputData(b, 25, 30);
+    matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
+        20, 40);
+
+    // After read, the position of the chunkStream is evaluated from the
+    // buffers and the chunkPosition should be reset to -1.
+    Assert.assertEquals(-1, chunkStream.getChunkPosition());
+
+    // Seek to a position within the current buffers. Current buffers contain
+    // data from index 20 to 59. ChunkPosition should still not be used to
+    // set the position.
+    seekAndVerify(35);
+    Assert.assertEquals(-1, chunkStream.getChunkPosition());
+
+    // Seek to a position outside the current buffers. In this case, the
+    // chunkPosition should be updated to the seeked position.
+    seekAndVerify(75);
+    Assert.assertEquals(75, chunkStream.getChunkPosition());
+  }
+
+  @Test
+  public void testSeekAndRead() throws Exception {
+    // Seek to a position and read data
+    seekAndVerify(50);
+    byte[] b1 = new byte[20];
+    chunkStream.read(b1, 0, 20);
+    matchWithInputData(b1, 50, 20);
+
+    // Next read should start from the position of the last read + 1 i.e. 70
+    byte[] b2 = new byte[20];
+    chunkStream.read(b2, 0, 20);
+    matchWithInputData(b2, 70, 20);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 1a359fe..0e70515 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -225,15 +225,17 @@ public class Checksum {
 
   /**
    * Computes the ChecksumData for the input data and verifies that it
-   * matches with that of the input checksumData.
+   * matches with that of the input checksumData, starting from index
+   * startIndex.
    * @param byteString input data
    * @param checksumData checksumData to match with
+   * @param startIndex index of first checksum in checksumData to match with
+   *                   data's computed checksum.
    * @throws OzoneChecksumException is thrown if checksums do not match
    */
-  public static boolean verifyChecksum(
-      ByteString byteString, ChecksumData checksumData)
-      throws OzoneChecksumException {
-    return verifyChecksum(byteString.toByteArray(), checksumData);
+  public static boolean verifyChecksum(ByteString byteString,
+      ChecksumData checksumData, int startIndex) throws OzoneChecksumException {
+    return verifyChecksum(byteString.toByteArray(), checksumData, startIndex);
   }
 
   /**
@@ -245,6 +247,20 @@ public class Checksum {
    */
   public static boolean verifyChecksum(byte[] data, ChecksumData checksumData)
       throws OzoneChecksumException {
+    return verifyChecksum(data, checksumData, 0);
+  }
+
+  /**
+   * Computes the ChecksumData for the input data and verifies that it
+   * matches with that of the input checksumData.
+   * @param data input data
+   * @param checksumData checksumData to match with
+   * @param startIndex index of first checksum in checksumData to match with
+   *                   data's computed checksum.
+   * @throws OzoneChecksumException is thrown if checksums do not match
+   */
+  public static boolean verifyChecksum(byte[] data, ChecksumData checksumData,
+      int startIndex) throws OzoneChecksumException {
     ChecksumType checksumType = checksumData.getChecksumType();
     if (checksumType == ChecksumType.NONE) {
       // Checksum is set to NONE. No further verification is required.
@@ -256,7 +272,8 @@ public class Checksum {
     ChecksumData computedChecksumData =
         checksum.computeChecksum(data, 0, data.length);
 
-    return checksumData.verifyChecksumDataMatches(computedChecksumData);
+    return checksumData.verifyChecksumDataMatches(computedChecksumData,
+        startIndex);
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
index dafa0e3..c0799bb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
@@ -111,13 +111,20 @@ public class ChecksumData {
   }
 
   /**
-   * Verify that this ChecksumData matches with the input ChecksumData.
+   * Verify that this ChecksumData from startIndex to endIndex matches with the
+   * provided ChecksumData.
+   * The checksum at startIndex of this ChecksumData will be matched with the
+   * checksum at index 0 of the provided ChecksumData, and checksum at
+   * (startIndex + 1) of this ChecksumData with checksum at index 1 of
+   * provided ChecksumData and so on.
    * @param that the ChecksumData to match with
+   * @param startIndex index of the first checksum from this ChecksumData
+   *                   which will be used to compare checksums
    * @return true if checksums match
    * @throws OzoneChecksumException
    */
-  public boolean verifyChecksumDataMatches(ChecksumData that) throws
-      OzoneChecksumException {
+  public boolean verifyChecksumDataMatches(ChecksumData that, int startIndex)
+      throws OzoneChecksumException {
 
     // pre checks
     if (this.checksums.size() == 0) {
@@ -130,18 +137,22 @@ public class ChecksumData {
           "checksums");
     }
 
-    if (this.checksums.size() != that.checksums.size()) {
-      throw new OzoneChecksumException("Original and Computed checksumData's " +
-          "has different number of checksums");
-    }
+    int numChecksums = that.checksums.size();
 
-    // Verify that checksum matches at each index
-    for (int index = 0; index < this.checksums.size(); index++) {
-      if (!matchChecksumAtIndex(this.checksums.get(index),
-          that.checksums.get(index))) {
-        // checksum mismatch. throw exception.
-        throw new OzoneChecksumException(index);
+    try {
+      // Verify that checksum matches at each index
+      for (int index = 0; index < numChecksums; index++) {
+        if (!matchChecksumAtIndex(this.checksums.get(startIndex + index),
+            that.checksums.get(index))) {
+          // checksum mismatch. throw exception.
+          throw new OzoneChecksumException(index);
+        }
       }
+    } catch (ArrayIndexOutOfBoundsException e) {
+      throw new OzoneChecksumException("Computed checksum has "
+          + numChecksums + " number of checksums. Original checksum has " +
+          (this.checksums.size() - startIndex) + " number of checksums " +
+          "starting from index " + startIndex);
     }
     return true;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 5b63420..41ac60f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,19 +20,10 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.util.Preconditions;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,60 +44,93 @@ public class KeyInputStream extends InputStream implements Seekable {
 
   private static final int EOF = -1;
 
-  private final ArrayList<ChunkInputStreamEntry> streamEntries;
-  // streamOffset[i] stores the offset at which blockInputStream i stores
-  // data in the key
-  private long[] streamOffset = null;
-  private int currentStreamIndex;
+  private String key;
   private long length = 0;
   private boolean closed = false;
-  private String key;
 
-  public KeyInputStream() {
-    streamEntries = new ArrayList<>();
-    currentStreamIndex = 0;
-  }
+  // List of BlockInputStreams, one for each block in the key
+  private final List<BlockInputStream> blockStreams;
 
-  @VisibleForTesting
-  public synchronized int getCurrentStreamIndex() {
-    return currentStreamIndex;
-  }
+  // blockOffsets[i] stores the index of the first data byte in
+  // blockStream w.r.t the key data.
+  // For example, let’s say the block size is 200 bytes and block[0] stores
+  // data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
+  // Then, blockOffset[0] = 0 (the offset of the first byte of data in
+  // block[0]), blockOffset[1] = 200 and so on.
+  private long[] blockOffsets = null;
 
-  @VisibleForTesting
-  public long getRemainingOfIndex(int index) throws IOException {
-    return streamEntries.get(index).getRemaining();
+  // Index of the blockStream corresponding to the current position of the
+  // KeyInputStream i.e. offset of the data to be read next
+  private int blockIndex;
+
+  // Tracks the blockIndex corresponding to the last seeked position so that it
+  // can be reset if a new position is seeked.
+  private int blockIndexOfPrevPosition;
+
+  public KeyInputStream() {
+    blockStreams = new ArrayList<>();
+    blockIndex = 0;
   }
 
   /**
-   * Append another stream to the end of the list.
-   *
-   * @param stream       the stream instance.
-   * @param streamLength the max number of bytes that should be written to this
-   *                     stream.
+   * For each block in keyInfo, add a BlockInputStream to blockStreams.
    */
-  @VisibleForTesting
-  public synchronized void addStream(BlockInputStream stream,
-      long streamLength) {
-    streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
+  public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo,
+      XceiverClientManager xceiverClientManager, String requestId,
+      boolean verifyChecksum) {
+    List<OmKeyLocationInfo> keyLocationInfos = keyInfo
+        .getLatestVersionLocations().getBlocksLatestVersionOnly();
+
+    KeyInputStream keyInputStream = new KeyInputStream();
+    keyInputStream.initialize(keyInfo.getKeyName(), keyLocationInfos,
+        xceiverClientManager, requestId, verifyChecksum);
+
+    return new LengthInputStream(keyInputStream, keyInputStream.length);
+  }
+
+  private synchronized void initialize(String keyName,
+      List<OmKeyLocationInfo> blockInfos,
+      XceiverClientManager xceiverClientManager, String requestId,
+      boolean verifyChecksum) {
+    this.key = keyName;
+    this.blockOffsets = new long[blockInfos.size()];
+    long keyLength = 0;
+    for (int i = 0; i < blockInfos.size(); i++) {
+      OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
+      LOG.debug("Adding stream for accessing {}. The stream will be " +
+          "initialized later.", omKeyLocationInfo);
+
+      addStream(omKeyLocationInfo, xceiverClientManager, requestId,
+          verifyChecksum);
+
+      this.blockOffsets[i] = keyLength;
+      keyLength += omKeyLocationInfo.getLength();
+    }
+    this.length = keyLength;
   }
 
   /**
-   * Append another ChunkInputStreamEntry to the end of the list.
-   * The stream will be constructed from the input information when it needs
-   * to be accessed.
+   * Append another BlockInputStream to the end of the list. Note that the
+   * BlockInputStream is only created here and not initialized. The
+   * BlockInputStream is initialized when a read operation is performed on
+   * the block for the first time.
    */
-  private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo,
+  private synchronized void addStream(OmKeyLocationInfo blockInfo,
       XceiverClientManager xceiverClientMngr, String clientRequestId,
       boolean verifyChecksum) {
-    streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo,
-        xceiverClientMngr, clientRequestId, verifyChecksum));
+    blockStreams.add(new BlockInputStream(blockInfo.getBlockID(),
+        blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(),
+        verifyChecksum, clientRequestId, xceiverClientMngr));
   }
 
-  private synchronized ChunkInputStreamEntry getStreamEntry(int index)
-      throws IOException {
-    return streamEntries.get(index).getStream();
+  @VisibleForTesting
+  public void addStream(BlockInputStream blockInputStream) {
+    blockStreams.add(blockInputStream);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public synchronized int read() throws IOException {
     byte[] buf = new byte[1];
@@ -116,9 +140,12 @@ public class KeyInputStream extends InputStream implements Seekable {
     return Byte.toUnsignedInt(buf[0]);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
-    checkNotClosed();
+    checkOpen();
     if (b == null) {
       throw new NullPointerException();
     }
@@ -131,13 +158,15 @@ public class KeyInputStream extends InputStream implements Seekable {
     int totalReadLen = 0;
     while (len > 0) {
       // if we are at the last block and have read the entire block, return
-      if (streamEntries.size() == 0 ||
-              (streamEntries.size() - 1 <= currentStreamIndex &&
-                      streamEntries.get(currentStreamIndex)
-                              .getRemaining() == 0)) {
+      if (blockStreams.size() == 0 ||
+          (blockStreams.size() - 1 <= blockIndex &&
+              blockStreams.get(blockIndex)
+                  .getRemaining() == 0)) {
         return totalReadLen == 0 ? EOF : totalReadLen;
       }
-      ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex);
+
+      // Get the current blockStream and read data from it
+      BlockInputStream current = blockStreams.get(blockIndex);
       int numBytesToRead = Math.min(len, (int)current.getRemaining());
       int numBytesRead = current.read(b, off, numBytesToRead);
       if (numBytesRead != numBytesToRead) {
@@ -146,23 +175,35 @@ public class KeyInputStream extends InputStream implements Seekable {
         // this case.
         throw new IOException(String.format(
             "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
-            current.blockInputStream.getBlockID(), current.length,
-            numBytesRead));
+            current.getBlockID(), current.getLength(), numBytesRead));
       }
       totalReadLen += numBytesRead;
       off += numBytesRead;
       len -= numBytesRead;
       if (current.getRemaining() <= 0 &&
-          ((currentStreamIndex + 1) < streamEntries.size())) {
-        currentStreamIndex += 1;
+          ((blockIndex + 1) < blockStreams.size())) {
+        blockIndex += 1;
       }
     }
     return totalReadLen;
   }
 
+  /**
+   * Seeks the KeyInputStream to the specified position. This involves 2 steps:
+   *    1. Updating the blockIndex to the blockStream corresponding to the
+   *    seeked position.
+   *    2. Seeking the corresponding blockStream to the adjusted position.
+   *
+   * For example, let’s say the block size is 200 bytes and block[0] stores
+   * data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
+   * Let’s say we seek to position 240. In the first step, the blockIndex
+   * would be updated to 1 as indices 200 - 399 reside in blockStream[1]. In
+   * the second step, the blockStream[1] would be seeked to position 40 (=
+   * 240 - blockOffset[1] (= 200)).
+   */
   @Override
-  public void seek(long pos) throws IOException {
-    checkNotClosed();
+  public synchronized void seek(long pos) throws IOException {
+    checkOpen();
     if (pos < 0 || pos >= length) {
       if (pos == 0) {
         // It is possible for length and pos to be zero in which case
@@ -172,35 +213,39 @@ public class KeyInputStream extends InputStream implements Seekable {
       throw new EOFException(
           "EOF encountered at pos: " + pos + " for key: " + key);
     }
-    Preconditions.assertTrue(currentStreamIndex >= 0);
-    if (currentStreamIndex >= streamEntries.size()) {
-      currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
-    } else if (pos < streamOffset[currentStreamIndex]) {
-      currentStreamIndex =
-          Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
-    } else if (pos >= streamOffset[currentStreamIndex] + streamEntries
-        .get(currentStreamIndex).length) {
-      currentStreamIndex = Arrays
-          .binarySearch(streamOffset, currentStreamIndex + 1,
-              streamEntries.size(), pos);
+
+    // 1. Update the blockIndex
+    if (blockIndex >= blockStreams.size()) {
+      blockIndex = Arrays.binarySearch(blockOffsets, pos);
+    } else if (pos < blockOffsets[blockIndex]) {
+      blockIndex =
+          Arrays.binarySearch(blockOffsets, 0, blockIndex, pos);
+    } else if (pos >= blockOffsets[blockIndex] + blockStreams
+        .get(blockIndex).getLength()) {
+      blockIndex = Arrays
+          .binarySearch(blockOffsets, blockIndex + 1,
+              blockStreams.size(), pos);
     }
-    if (currentStreamIndex < 0) {
+    if (blockIndex < 0) {
       // Binary search returns -insertionPoint - 1  if element is not present
       // in the array. insertionPoint is the point at which element would be
-      // inserted in the sorted array. We need to adjust the currentStreamIndex
-      // accordingly so that currentStreamIndex = insertionPoint - 1
-      currentStreamIndex = -currentStreamIndex - 2;
+      // inserted in the sorted array. We need to adjust the blockIndex
+      // accordingly so that blockIndex = insertionPoint - 1
+      blockIndex = -blockIndex - 2;
     }
-    // seek to the proper offset in the BlockInputStream
-    streamEntries.get(currentStreamIndex)
-        .seek(pos - streamOffset[currentStreamIndex]);
+
+    // Reset the previous blockStream's position
+    blockStreams.get(blockIndexOfPrevPosition).resetPosition();
+
+    // 2. Seek the blockStream to the adjusted position
+    blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]);
+    blockIndexOfPrevPosition = blockIndex;
   }
 
   @Override
-  public long getPos() throws IOException {
-    return length == 0 ? 0 :
-        streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex)
-            .getPos();
+  public synchronized long getPos() throws IOException {
+    return length == 0 ? 0 : blockOffsets[blockIndex] +
+        blockStreams.get(blockIndex).getPos();
   }
 
   @Override
@@ -210,7 +255,7 @@ public class KeyInputStream extends InputStream implements Seekable {
 
   @Override
   public int available() throws IOException {
-    checkNotClosed();
+    checkOpen();
     long remaining = length - getPos();
     return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
   }
@@ -218,177 +263,30 @@ public class KeyInputStream extends InputStream implements Seekable {
   @Override
   public void close() throws IOException {
     closed = true;
-    for (int i = 0; i < streamEntries.size(); i++) {
-      streamEntries.get(i).close();
+    for (BlockInputStream blockStream : blockStreams) {
+      blockStream.close();
     }
   }
 
   /**
-   * Encapsulates BlockInputStream.
-   */
-  public static class ChunkInputStreamEntry extends InputStream
-      implements Seekable {
-
-    private BlockInputStream blockInputStream;
-    private final OmKeyLocationInfo blockLocationInfo;
-    private final long length;
-    private final XceiverClientManager xceiverClientManager;
-    private final String requestId;
-    private boolean verifyChecksum;
-
-    // the position of the blockInputStream is maintained by this variable
-    // till the stream is initialized
-    private long position;
-
-    public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo,
-        XceiverClientManager xceiverClientMngr, String clientRequestId,
-        boolean verifyChecksum) {
-      this.blockLocationInfo = omKeyLocationInfo;
-      this.length = omKeyLocationInfo.getLength();
-      this.xceiverClientManager = xceiverClientMngr;
-      this.requestId = clientRequestId;
-      this.verifyChecksum = verifyChecksum;
-    }
-
-    @VisibleForTesting
-    public ChunkInputStreamEntry(BlockInputStream blockInputStream,
-        long length) {
-      this.blockInputStream = blockInputStream;
-      this.length = length;
-      this.blockLocationInfo = null;
-      this.xceiverClientManager = null;
-      this.requestId = null;
-    }
-
-    private ChunkInputStreamEntry getStream() throws IOException {
-      if (this.blockInputStream == null) {
-        initializeBlockInputStream();
-      }
-      return this;
-    }
-
-    private void initializeBlockInputStream() throws IOException {
-      BlockID blockID = blockLocationInfo.getBlockID();
-      long containerID = blockID.getContainerID();
-      Pipeline pipeline = blockLocationInfo.getPipeline();
-
-      // irrespective of the container state, we will always read via Standalone
-      // protocol.
-      if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
-        pipeline = Pipeline.newBuilder(pipeline)
-            .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
-      }
-      XceiverClientSpi xceiverClient = xceiverClientManager
-          .acquireClient(pipeline);
-      boolean success = false;
-      long containerKey = blockLocationInfo.getLocalID();
-      try {
-        LOG.debug("Initializing stream for get key to access {} {}",
-            containerID, containerKey);
-        ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
-            .getDatanodeBlockIDProtobuf();
-        if (blockLocationInfo.getToken() != null) {
-          UserGroupInformation.getCurrentUser().
-              addToken(blockLocationInfo.getToken());
-        }
-        ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
-            .getBlock(xceiverClient, datanodeBlockID, requestId);
-        List<ContainerProtos.ChunkInfo> chunks =
-            response.getBlockData().getChunksList();
-        success = true;
-        this.blockInputStream = new BlockInputStream(
-            blockLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
-            chunks, requestId, verifyChecksum, position);
-      } finally {
-        if (!success) {
-          xceiverClientManager.releaseClient(xceiverClient, false);
-        }
-      }
-    }
-
-    synchronized long getRemaining() throws IOException {
-      return length - getPos();
-    }
-
-    @Override
-    public synchronized int read(byte[] b, int off, int len)
-        throws IOException {
-      int readLen = blockInputStream.read(b, off, len);
-      return readLen;
-    }
-
-    @Override
-    public synchronized int read() throws IOException {
-      int data = blockInputStream.read();
-      return data;
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-      if (blockInputStream != null) {
-        blockInputStream.close();
-      }
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      if (blockInputStream != null) {
-        blockInputStream.seek(pos);
-      } else {
-        position = pos;
-      }
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      if (blockInputStream != null) {
-        return blockInputStream.getPos();
-      } else {
-        return position;
-      }
-    }
-
-    @Override
-    public boolean seekToNewSource(long targetPos) throws IOException {
-      return false;
-    }
-  }
-
-  public static LengthInputStream getFromOmKeyInfo(
-      OmKeyInfo keyInfo,
-      XceiverClientManager xceiverClientManager,
-      StorageContainerLocationProtocol
-          storageContainerLocationClient,
-      String requestId, boolean verifyChecksum) throws IOException {
-    long length = 0;
-    KeyInputStream groupInputStream = new KeyInputStream();
-    groupInputStream.key = keyInfo.getKeyName();
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
-    groupInputStream.streamOffset = new long[keyLocationInfos.size()];
-    for (int i = 0; i < keyLocationInfos.size(); i++) {
-      OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
-      LOG.debug("Adding stream for accessing {}. The stream will be " +
-          "initialized later.", omKeyLocationInfo);
-      groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager,
-          requestId, verifyChecksum);
-
-      groupInputStream.streamOffset[i] = length;
-      length += omKeyLocationInfo.getLength();
-    }
-    groupInputStream.length = length;
-    return new LengthInputStream(groupInputStream, length);
-  }
-
-  /**
    * Verify that the input stream is open. Non blocking; this gives
    * the last state of the volatile {@link #closed} field.
    * @throws IOException if the connection is closed.
    */
-  private void checkNotClosed() throws IOException {
+  private void checkOpen() throws IOException {
     if (closed) {
       throw new IOException(
           ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
     }
   }
+
+  @VisibleForTesting
+  public synchronized int getCurrentStreamIndex() {
+    return blockIndex;
+  }
+
+  @VisibleForTesting
+  public long getRemainingOfIndex(int index) throws IOException {
+    return blockStreams.get(index).getRemaining();
+  }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 48968a4..5f2df7d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1072,8 +1072,8 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
   private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
       String requestId) throws IOException {
     LengthInputStream lengthInputStream = KeyInputStream
-        .getFromOmKeyInfo(keyInfo, xceiverClientManager,
-            storageContainerLocationClient, requestId, verifyChecksum);
+        .getFromOmKeyInfo(keyInfo, xceiverClientManager, requestId,
+            verifyChecksum);
     FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
     if (feInfo != null) {
       final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index a4aa361..6876166 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -503,8 +503,7 @@ public final class DistributedStorageHandler implements StorageHandler {
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
     return KeyInputStream.getFromOmKeyInfo(
-        keyInfo, xceiverClientManager, storageContainerLocationClient,
-        args.getRequestID(), verifyChecksum);
+        keyInfo, xceiverClientManager, args.getRequestID(), verifyChecksum);
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index 45f04df..80717dd 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -25,7 +25,6 @@ import org.junit.rules.ExpectedException;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
@@ -48,8 +47,7 @@ public class TestChunkStreams {
       for (int i = 0; i < 5; i++) {
         int tempOffset = offset;
         BlockInputStream in =
-            new BlockInputStream(null, null, null, new ArrayList<>(), null,
-                true, 0) {
+            new BlockInputStream(null, 100, null, null, true, null, null) {
               private long pos = 0;
               private ByteArrayInputStream in =
                   new ByteArrayInputStream(buf, tempOffset, 100);
@@ -84,7 +82,7 @@ public class TestChunkStreams {
               }
             };
         offset += 100;
-        groupInputStream.addStream(in, 100);
+        groupInputStream.addStream(in);
       }
 
       byte[] resBuf = new byte[500];
@@ -105,8 +103,7 @@ public class TestChunkStreams {
       for (int i = 0; i < 5; i++) {
         int tempOffset = offset;
         BlockInputStream in =
-            new BlockInputStream(null, null, null, new ArrayList<>(), null,
-                true, 0) {
+            new BlockInputStream(null, 100, null, null, true, null, null) {
               private long pos = 0;
               private ByteArrayInputStream in =
                   new ByteArrayInputStream(buf, tempOffset, 100);
@@ -141,7 +138,7 @@ public class TestChunkStreams {
               }
             };
         offset += 100;
-        groupInputStream.addStream(in, 100);
+        groupInputStream.addStream(in);
       }
 
       byte[] resBuf = new byte[600];


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org