You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/09/14 20:45:46 UTC
hadoop git commit: HDDS-419. ChunkInputStream bulk read api does not
read from all the chunks. Contributed by Lokesh Jain and Mukul Kumar.
Repository: hadoop
Updated Branches:
refs/heads/trunk 488806bac -> 6f037468b
HDDS-419. ChunkInputStream bulk read api does not read from all the chunks. Contributed by Lokesh Jain and Mukul Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f037468
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f037468
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f037468
Branch: refs/heads/trunk
Commit: 6f037468bce7bbda6b9fc01166f2c61ae40b690b
Parents: 488806b
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Sep 14 13:34:29 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Sep 14 13:45:38 2018 -0700
----------------------------------------------------------------------
.../hdds/scm/storage/ChunkInputStream.java | 30 +++++++++++++++-----
.../ozone/client/io/ChunkGroupInputStream.java | 25 ++++++++--------
.../hadoop/ozone/freon/TestDataValidate.java | 4 +--
.../hadoop/ozone/freon/RandomKeyGenerator.java | 7 +++--
4 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f037468/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index a969b68..a483197 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -121,12 +121,17 @@ public class ChunkInputStream extends InputStream implements Seekable {
return 0;
}
checkOpen();
- int available = prepareRead(len);
- if (available == EOF) {
- return EOF;
+ int total = 0;
+ while (len > 0) {
+ int available = prepareRead(len);
+ if (available == EOF) {
+ return total != 0 ? total : EOF;
+ }
+ buffers.get(bufferIndex).get(b, off + total, available);
+ len -= available;
+ total += available;
}
- buffers.get(bufferIndex).get(b, off, available);
- return available;
+ return total;
}
@Override
@@ -196,13 +201,20 @@ public class ChunkInputStream extends InputStream implements Seekable {
// next chunk
chunkIndex += 1;
final ReadChunkResponseProto readChunkResponse;
+ final ChunkInfo chunkInfo = chunks.get(chunkIndex);
try {
- readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
- chunks.get(chunkIndex), blockID, traceID);
+ readChunkResponse = ContainerProtocolCalls
+ .readChunk(xceiverClient, chunkInfo, blockID, traceID);
} catch (IOException e) {
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
ByteString byteString = readChunkResponse.getData();
+ if (byteString.size() != chunkInfo.getLen()) {
+ // Bytes read from chunk should be equal to chunk size.
+ throw new IOException(String
+ .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+ chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
+ }
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
}
@@ -260,4 +272,8 @@ public class ChunkInputStream extends InputStream implements Seekable {
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
+
+ public BlockID getBlockID() {
+ return blockID;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f037468/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index 742cfcc..94966f6 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -115,19 +115,20 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
return totalReadLen == 0 ? EOF : totalReadLen;
}
ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
- int readLen = Math.min(len, (int)current.getRemaining());
- int actualLen = current.read(b, off, readLen);
- // this means the underlying stream has nothing at all, return
- if (actualLen == EOF) {
- return totalReadLen > 0 ? totalReadLen : EOF;
+ int numBytesToRead = Math.min(len, (int)current.getRemaining());
+ int numBytesRead = current.read(b, off, numBytesToRead);
+ if (numBytesRead != numBytesToRead) {
+ // This implies that there is either data loss or corruption in the
+ // chunk entries. Even EOF in the current stream would be covered in
+ // this case.
+ throw new IOException(String.format(
+ "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
+ current.chunkInputStream.getBlockID(), current.length,
+ numBytesRead));
}
- totalReadLen += actualLen;
- // this means there is no more data to read beyond this point, return
- if (actualLen != readLen) {
- return totalReadLen;
- }
- off += readLen;
- len -= readLen;
+ totalReadLen += numBytesRead;
+ off += numBytesRead;
+ len -= numBytesRead;
if (current.getRemaining() <= 0) {
currentStreamIndex += 1;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f037468/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdce736..a2df50d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -68,7 +68,7 @@ public class TestDataValidate {
randomKeyGenerator.setNumOfKeys(1);
randomKeyGenerator.setType(ReplicationType.RATIS);
randomKeyGenerator.setFactor(ReplicationFactor.THREE);
- randomKeyGenerator.setKeySize(104857600);
+ randomKeyGenerator.setKeySize(20971520);
randomKeyGenerator.setValidateWrites(true);
randomKeyGenerator.call();
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
@@ -84,7 +84,7 @@ public class TestDataValidate {
randomKeyGenerator.setNumOfVolumes(1);
randomKeyGenerator.setNumOfBuckets(1);
randomKeyGenerator.setNumOfKeys(1);
- randomKeyGenerator.setKeySize(104857600);
+ randomKeyGenerator.setKeySize(20971520);
randomKeyGenerator.setValidateWrites(true);
randomKeyGenerator.call();
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f037468/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
index ee4cc87..d73e37e 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -984,9 +985,9 @@ public final class RandomKeyGenerator implements Callable<Void> {
writeValidationFailureCount++;
LOG.warn("Data validation error for key {}/{}/{}",
kv.bucket.getVolumeName(), kv.bucket, kv.key);
- LOG.warn("Expected: {}, Actual: {}",
- DFSUtil.bytes2String(kv.value),
- DFSUtil.bytes2String(value));
+ LOG.warn("Expected checksum: {}, Actual checksum: {}",
+ DigestUtils.md5Hex(kv.value),
+ DigestUtils.md5Hex(value));
}
}
} catch (IOException | InterruptedException ex) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org