You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2019/11/06 05:26:27 UTC
[hadoop-ozone] branch master updated: HDDS-2359. Seeking randomly
in a key with more than 2 blocks of data leads to inconsistent reads (#82)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9565cc5 HDDS-2359. Seeking randomly in a key with more than 2 blocks of data leads to inconsistent reads (#82)
9565cc5 is described below
commit 9565cc56444e5daa151591a4fa6992db0ac12a6f
Author: bshashikant <sh...@apache.org>
AuthorDate: Wed Nov 6 10:56:16 2019 +0530
HDDS-2359. Seeking randomly in a key with more than 2 blocks of data leads to inconsistent reads (#82)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 11 ++-
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 6 +-
.../hadoop/ozone/client/io/KeyInputStream.java | 13 ++-
.../ozone/client/rpc/TestKeyInputStream.java | 104 ++++++++++++++++++++-
4 files changed, 125 insertions(+), 9 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 40bbd93..8404d31 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -239,13 +239,15 @@ public class BlockInputStream extends InputStream implements Seekable {
ChunkInputStream current = chunkStreams.get(chunkIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);
+
if (numBytesRead != numBytesToRead) {
// This implies that there is either data loss or corruption in the
// chunk entries. Even EOF in the current stream would be covered in
// this case.
throw new IOException(String.format(
- "Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
- current.getChunkName(), current.getLength(), numBytesRead));
+ "Inconsistent read for chunkName=%s length=%d numBytesToRead= %d " +
+ "numBytesRead=%d", current.getChunkName(), current.getLength(),
+ numBytesToRead, numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
@@ -315,6 +317,11 @@ public class BlockInputStream extends InputStream implements Seekable {
// Reset the previous chunkStream's position
chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();
+ // Reset all the chunkStreams above the chunkIndex. We do this to reset
+ // any previous reads which might have updated the chunkPosition.
+ for (int index = chunkIndex + 1; index < chunkStreams.size(); index++) {
+ chunkStreams.get(index).seek(0);
+ }
// seek to the proper offset in the ChunkInputStream
chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
chunkIndexOfPrevPosition = chunkIndex;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index f94d2d8..650c5ea 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -52,7 +52,6 @@ public class ChunkInputStream extends InputStream implements Seekable {
private XceiverClientSpi xceiverClient;
private boolean verifyChecksum;
private boolean allocated = false;
-
// Buffer to store the chunk data read from the DN container
private List<ByteBuffer> buffers;
@@ -75,7 +74,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
private static final int EOF = -1;
- ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
+ ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
XceiverClientSpi xceiverClient, boolean verifyChecksum) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
@@ -520,6 +519,9 @@ public class ChunkInputStream extends InputStream implements Seekable {
private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
+ // We should not reset bufferOffset and bufferLength here because when
+ // getPos() is called in chunkStreamsEOF() we use these values and
+ // determine whether chunk is read completely or not.
}
/**
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index ecbb329..ea81f43 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -175,9 +175,10 @@ public class KeyInputStream extends InputStream implements Seekable {
// This implies that there is either data loss or corruption in the
// chunk entries. Even EOF in the current stream would be covered in
// this case.
- throw new IOException(String.format(
- "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
- current.getBlockID(), current.getLength(), numBytesRead));
+ throw new IOException(String.format("Inconsistent read for blockID=%s "
+ + "length=%d numBytesToRead=%d numBytesRead=%d",
+ current.getBlockID(), current.getLength(), numBytesToRead,
+ numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
@@ -239,6 +240,12 @@ public class KeyInputStream extends InputStream implements Seekable {
// Reset the previous blockStream's position
blockStreams.get(blockIndexOfPrevPosition).resetPosition();
+ // Reset all the blockStreams above the blockIndex. We do this to reset
+ // any previous reads which might have updated the blockPosition and
+ // chunkIndex.
+ for (int index = blockIndex + 1; index < blockStreams.size(); index++) {
+ blockStreams.get(index).seek(0);
+ }
// 2. Seek the blockStream to the adjusted position
blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]);
blockIndexOfPrevPosition = blockIndex;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index fa8a289..6e7e328 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -37,6 +37,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -118,6 +119,107 @@ public class TestKeyInputStream {
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
+
+ @Test
+ public void testSeekRandomly() throws Exception {
+ XceiverClientMetrics metrics = XceiverClientManager
+ .getXceiverClientMetrics();
+
+ String keyName = getKeyName();
+ OzoneOutputStream key = ContainerTestHelper.createKey(keyName,
+ ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+ // write data of more than 2 blocks.
+ int dataLength = (2 * blockSize) + (chunkSize);
+
+ Random rd = new Random();
+ byte[] inputData = new byte[dataLength];
+ rd.nextBytes(inputData);
+ key.write(inputData);
+ key.close();
+
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+
+ // Seek to some where end.
+ validate(keyInputStream, inputData, dataLength-200, 100);
+
+ // Now seek to start.
+ validate(keyInputStream, inputData, 0, 140);
+
+ validate(keyInputStream, inputData, 200, 300);
+
+ validate(keyInputStream, inputData, 30, 500);
+
+ randomSeek(dataLength, keyInputStream, inputData);
+
+ // Read entire key.
+ validate(keyInputStream, inputData, 0, dataLength);
+
+ // Repeat again and check.
+ randomSeek(dataLength, keyInputStream, inputData);
+
+ validate(keyInputStream, inputData, 0, dataLength);
+
+ keyInputStream.close();
+ }
+
+ /**
+ * This method does random seeks and reads and validates the reads are
+ * correct or not.
+ * @param dataLength
+ * @param keyInputStream
+ * @param inputData
+ * @throws Exception
+ */
+ private void randomSeek(int dataLength, KeyInputStream keyInputStream,
+ byte[] inputData) throws Exception {
+ // Do random seek.
+ for (int i=0; i<dataLength - 300; i+=20) {
+ validate(keyInputStream, inputData, i, 200);
+ }
+
+ // Seek to end and read in reverse order. And also this is partial chunks
+ // as readLength is 20, chunk length is 100.
+ for (int i=dataLength - 100; i>=100; i-=20) {
+ validate(keyInputStream, inputData, i, 20);
+ }
+
+ // Start from begin and seek such that we read partially chunks.
+ for (int i=0; i<dataLength - 300; i+=20) {
+ validate(keyInputStream, inputData, i, 90);
+ }
+
+ }
+
+ /**
+ * This method seeks to specified seek value and read the data specified by
+ * readLength and validate the read is correct or not.
+ * @param keyInputStream
+ * @param inputData
+ * @param seek
+ * @param readLength
+ * @throws Exception
+ */
+ private void validate(KeyInputStream keyInputStream, byte[] inputData,
+ long seek, int readLength) throws Exception {
+ keyInputStream.seek(seek);
+
+ byte[] expectedData = new byte[readLength];
+ keyInputStream.read(expectedData, 0, readLength);
+
+ byte[] dest = new byte[readLength];
+
+ System.arraycopy(inputData, (int)seek, dest, 0, readLength);
+
+ for (int i=0; i < readLength; i++) {
+ Assert.assertEquals(expectedData[i], dest[i]);
+ }
+ }
+
+
@Test
public void testSeek() throws Exception {
XceiverClientMetrics metrics = XceiverClientManager
@@ -153,8 +255,6 @@ public class TestKeyInputStream {
// Seek operation should not result in any readChunk operation.
Assert.assertEquals(readChunkCount, metrics
.getContainerOpsMetrics(ContainerProtos.Type.ReadChunk));
- Assert.assertEquals(readChunkCount, metrics
- .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
byte[] readData = new byte[chunkSize];
keyInputStream.read(readData, 0, chunkSize);
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org