You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2019/11/06 05:26:27 UTC

[hadoop-ozone] branch master updated: HDDS-2359. Seeking randomly in a key with more than 2 blocks of data leads to inconsistent reads (#82)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9565cc5  HDDS-2359. Seeking randomly in a key with more than 2 blocks of data leads to inconsistent reads (#82)
9565cc5 is described below

commit 9565cc56444e5daa151591a4fa6992db0ac12a6f
Author: bshashikant <sh...@apache.org>
AuthorDate: Wed Nov 6 10:56:16 2019 +0530

    HDDS-2359. Seeking randomly in a key with more than 2 blocks of data leads to inconsistent reads (#82)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  11 ++-
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   6 +-
 .../hadoop/ozone/client/io/KeyInputStream.java     |  13 ++-
 .../ozone/client/rpc/TestKeyInputStream.java       | 104 ++++++++++++++++++++-
 4 files changed, 125 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 40bbd93..8404d31 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -239,13 +239,15 @@ public class BlockInputStream extends InputStream implements Seekable {
       ChunkInputStream current = chunkStreams.get(chunkIndex);
       int numBytesToRead = Math.min(len, (int)current.getRemaining());
       int numBytesRead = current.read(b, off, numBytesToRead);
+
       if (numBytesRead != numBytesToRead) {
         // This implies that there is either data loss or corruption in the
         // chunk entries. Even EOF in the current stream would be covered in
         // this case.
         throw new IOException(String.format(
-            "Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
-            current.getChunkName(), current.getLength(), numBytesRead));
+            "Inconsistent read for chunkName=%s length=%d numBytesToRead= %d " +
+                "numBytesRead=%d", current.getChunkName(), current.getLength(),
+            numBytesToRead, numBytesRead));
       }
       totalReadLen += numBytesRead;
       off += numBytesRead;
@@ -315,6 +317,11 @@ public class BlockInputStream extends InputStream implements Seekable {
     // Reset the previous chunkStream's position
     chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();
 
+    // Reset all the chunkStreams above the chunkIndex. We do this to reset
+    // any previous reads which might have updated the chunkPosition.
+    for (int index =  chunkIndex + 1; index < chunkStreams.size(); index++) {
+      chunkStreams.get(index).seek(0);
+    }
     // seek to the proper offset in the ChunkInputStream
     chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
     chunkIndexOfPrevPosition = chunkIndex;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index f94d2d8..650c5ea 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -52,7 +52,6 @@ public class ChunkInputStream extends InputStream implements Seekable {
   private XceiverClientSpi xceiverClient;
   private boolean verifyChecksum;
   private boolean allocated = false;
-
   // Buffer to store the chunk data read from the DN container
   private List<ByteBuffer> buffers;
 
@@ -75,7 +74,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
 
   private static final int EOF = -1;
 
-  ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, 
+  ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
           XceiverClientSpi xceiverClient, boolean verifyChecksum) {
     this.chunkInfo = chunkInfo;
     this.length = chunkInfo.getLen();
@@ -520,6 +519,9 @@ public class ChunkInputStream extends InputStream implements Seekable {
   private void releaseBuffers() {
     buffers = null;
     bufferIndex = 0;
+    // We should not reset bufferOffset and bufferLength here because when
+    // getPos() is called in chunkStreamsEOF() we use these values and
+    // determine whether chunk is read completely or not.
   }
 
   /**
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index ecbb329..ea81f43 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -175,9 +175,10 @@ public class KeyInputStream extends InputStream implements Seekable {
         // This implies that there is either data loss or corruption in the
         // chunk entries. Even EOF in the current stream would be covered in
         // this case.
-        throw new IOException(String.format(
-            "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
-            current.getBlockID(), current.getLength(), numBytesRead));
+        throw new IOException(String.format("Inconsistent read for blockID=%s "
+                        + "length=%d numBytesToRead=%d numBytesRead=%d",
+                current.getBlockID(), current.getLength(), numBytesToRead,
+                numBytesRead));
       }
       totalReadLen += numBytesRead;
       off += numBytesRead;
@@ -239,6 +240,12 @@ public class KeyInputStream extends InputStream implements Seekable {
     // Reset the previous blockStream's position
     blockStreams.get(blockIndexOfPrevPosition).resetPosition();
 
+    // Reset all the blockStreams above the blockIndex. We do this to reset
+    // any previous reads which might have updated the blockPosition and
+    // chunkIndex.
+    for (int index =  blockIndex + 1; index < blockStreams.size(); index++) {
+      blockStreams.get(index).seek(0);
+    }
     // 2. Seek the blockStream to the adjusted position
     blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]);
     blockIndexOfPrevPosition = blockIndex;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index fa8a289..6e7e328 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -37,6 +37,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -118,6 +119,107 @@ public class TestKeyInputStream {
         .createKey(keyName, type, size, objectStore, volumeName, bucketName);
   }
 
+
+  @Test
+  public void testSeekRandomly() throws Exception {
+    XceiverClientMetrics metrics = XceiverClientManager
+        .getXceiverClientMetrics();
+
+    String keyName = getKeyName();
+    OzoneOutputStream key = ContainerTestHelper.createKey(keyName,
+        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+    // write data of more than 2 blocks.
+    int dataLength = (2 * blockSize) + (chunkSize);
+
+    Random rd = new Random();
+    byte[] inputData = new byte[dataLength];
+    rd.nextBytes(inputData);
+    key.write(inputData);
+    key.close();
+
+
+    KeyInputStream keyInputStream = (KeyInputStream) objectStore
+        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+        .getInputStream();
+
+    // Seek to some where end.
+    validate(keyInputStream, inputData, dataLength-200, 100);
+
+    // Now seek to start.
+    validate(keyInputStream, inputData, 0, 140);
+
+    validate(keyInputStream, inputData, 200, 300);
+
+    validate(keyInputStream, inputData, 30, 500);
+
+    randomSeek(dataLength, keyInputStream, inputData);
+
+    // Read entire key.
+    validate(keyInputStream, inputData, 0, dataLength);
+
+    // Repeat again and check.
+    randomSeek(dataLength, keyInputStream, inputData);
+
+    validate(keyInputStream, inputData, 0, dataLength);
+
+    keyInputStream.close();
+  }
+
+  /**
+   * This method does random seeks and reads and validates the reads are
+   * correct or not.
+   * @param dataLength
+   * @param keyInputStream
+   * @param inputData
+   * @throws Exception
+   */
+  private void randomSeek(int dataLength, KeyInputStream keyInputStream,
+      byte[] inputData) throws Exception {
+    // Do random seek.
+    for (int i=0; i<dataLength - 300; i+=20) {
+      validate(keyInputStream, inputData, i, 200);
+    }
+
+    // Seek to end and read in reverse order. And also this is partial chunks
+    // as readLength is 20, chunk length is 100.
+    for (int i=dataLength - 100; i>=100; i-=20) {
+      validate(keyInputStream, inputData, i, 20);
+    }
+
+    // Start from begin and seek such that we read partially chunks.
+    for (int i=0; i<dataLength - 300; i+=20) {
+      validate(keyInputStream, inputData, i, 90);
+    }
+
+  }
+
+  /**
+   * This method seeks to specified seek value and read the data specified by
+   * readLength and validate the read is correct or not.
+   * @param keyInputStream
+   * @param inputData
+   * @param seek
+   * @param readLength
+   * @throws Exception
+   */
+  private void validate(KeyInputStream keyInputStream, byte[] inputData,
+      long seek, int readLength) throws Exception {
+    keyInputStream.seek(seek);
+
+    byte[] expectedData = new byte[readLength];
+    keyInputStream.read(expectedData, 0, readLength);
+
+    byte[] dest = new byte[readLength];
+
+    System.arraycopy(inputData, (int)seek, dest, 0, readLength);
+
+    for (int i=0; i < readLength; i++) {
+      Assert.assertEquals(expectedData[i], dest[i]);
+    }
+  }
+
+
   @Test
   public void testSeek() throws Exception {
     XceiverClientMetrics metrics = XceiverClientManager
@@ -153,8 +255,6 @@ public class TestKeyInputStream {
     // Seek operation should not result in any readChunk operation.
     Assert.assertEquals(readChunkCount, metrics
         .getContainerOpsMetrics(ContainerProtos.Type.ReadChunk));
-    Assert.assertEquals(readChunkCount, metrics
-        .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
 
     byte[] readData = new byte[chunkSize];
     keyInputStream.read(readData, 0, chunkSize);


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org