You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/02/02 19:06:52 UTC
hadoop git commit: HDFS-4660. Block corruption can happen during
pipeline recovery. Contributed by Kihwal Lee.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6 b52279ca2 -> 572b5872f
HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/572b5872
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/572b5872
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/572b5872
Branch: refs/heads/branch-2.6
Commit: 572b5872fee1e2f3b6bf87afe272f899bfdf7f44
Parents: b52279c
Author: Junping Du <ju...@apache.org>
Authored: Tue Feb 2 10:16:59 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Tue Feb 2 10:16:59 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hdfs/server/datanode/BlockReceiver.java | 126 ++++++++++++++-----
2 files changed, 96 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/572b5872/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 260ab38..612da31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -41,6 +41,8 @@ Release 2.6.4 - UNRELEASED
HDFS-9294. DFSClient deadlock when close file and failed to renew lease.
(Brahma Reddy Battula via szetszwo)
+ HDFS-4660. Block corruption can happen during pipeline recovery (kihwal)
+
Release 2.6.3 - 2015-12-17
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/572b5872/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2a6b46a..4b6668b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -583,29 +583,59 @@ class BlockReceiver implements Closeable {
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
- //finally write to the disk :
-
- if (onDiskLen % bytesPerChecksum != 0) {
- // prepare to overwrite last checksum
- adjustCrcFilePosition();
+ // Normally the beginning of an incoming packet is aligned with the
+ // existing data on disk. If the beginning packet data offset is not
+ // checksum chunk aligned, the end of packet will not go beyond the
+ // next chunk boundary.
+ // When a failure-recovery is involved, the client state and the
+ // the datanode state may not exactly agree. I.e. the client may
+ // resend part of data that is already on disk. Correct number of
+ // bytes should be skipped when writing the data and checksum
+ // buffers out to disk.
+ long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum;
+ boolean alignedOnDisk = partialChunkSizeOnDisk == 0;
+ boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0;
+
+ // Since data is always appended, not overwritten, partial CRC
+ // recalculation is necessary if the on-disk data is not chunk-
+ // aligned, regardless of whether the beginning of the data in
+ // the packet is chunk-aligned.
+ boolean doPartialCrc = !alignedOnDisk && !shouldNotWriteChecksum;
+
+ // If this is a partial chunk, then verify that this is the only
+ // chunk in the packet. If the starting offset is not chunk
+ // aligned, the packet should terminate at or before the next
+ // chunk boundary.
+ if (!alignedInPacket && len > bytesPerChecksum) {
+ throw new IOException("Unexpected packet data length for "
+ + block + " from " + inAddr + ": a partial chunk must be "
+ + " sent in an individual packet (data length = " + len
+ + " > bytesPerChecksum = " + bytesPerChecksum + ")");
}
-
- // If this is a partial chunk, then read in pre-existing checksum
+
+ // If the last portion of the block file is not a full chunk,
+ // then read in pre-existing partial data chunk and recalculate
+ // the checksum so that the checksum calculation can continue
+ // from the right state.
Checksum partialCrc = null;
- if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
+ if (doPartialCrc) {
if (LOG.isDebugEnabled()) {
LOG.debug("receivePacket for " + block
- + ": bytesPerChecksum=" + bytesPerChecksum
- + " does not divide firstByteInBlock=" + firstByteInBlock);
+ + ": previous write did not end at the chunk boundary."
+ + " onDiskLen=" + onDiskLen);
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize;
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
}
+ // The data buffer position where write will begin. If the packet
+ // data and on-disk data have no overlap, this will not be at the
+ // beginning of the buffer.
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
+ // Actual number of data bytes to write.
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
@@ -620,31 +650,63 @@ class BlockReceiver implements Closeable {
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
- } else if (partialCrc != null) {
- // If this is a partial chunk, then verify that this is the only
- // chunk in the packet. Calculate new crc for this chunk.
- if (len > bytesPerChecksum) {
- throw new IOException("Unexpected packet data length for "
- + block + " from " + inAddr + ": a partial chunk must be "
- + " sent in an individual packet (data length = " + len
- + " > bytesPerChecksum = " + bytesPerChecksum + ")");
+ } else {
+ int skip = 0;
+ byte[] crcBytes = null;
+
+ // First, overwrite the partial crc at the end, if necessary.
+ if (doPartialCrc) { // not chunk-aligned on disk
+ // Calculate new crc for this chunk.
+ int bytesToReadForRecalc =
+ (int)(bytesPerChecksum - partialChunkSizeOnDisk);
+ if (numBytesToDisk < bytesToReadForRecalc) {
+ bytesToReadForRecalc = numBytesToDisk;
+ }
+
+ partialCrc.update(dataBuf.array(), startByteToDisk,
+ bytesToReadForRecalc);
+ byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
+ checksumSize);
+ crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length);
+ // prepare to overwrite last checksum
+ adjustCrcFilePosition();
+ checksumOut.write(buf);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Writing out partial crc for data len " + len +
+ ", skip=" + skip);
+ }
+ skip++; // For the partial chunk that was just read.
}
- partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
- byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
- lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
- checksumOut.write(buf);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Writing out partial crc for data len " + len);
+
+ // Determine how many checksums need to be skipped up to the last
+ // boundary. The checksum after the boundary was already counted
+ // above. Only count the number of checksums skipped up to the
+ // boundary here.
+ long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum);
+ long skippedDataBytes = lastChunkBoundary - firstByteInBlock;
+
+ if (skippedDataBytes > 0) {
+ skip += (int)(skippedDataBytes / bytesPerChecksum) +
+ ((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1);
}
- partialCrc = null;
- } else {
- // write checksum
+ skip *= checksumSize; // Convert to number of bytes
+
+ // write the rest of checksum
final int offset = checksumBuf.arrayOffset() +
- checksumBuf.position();
- final int end = offset + checksumLen;
- lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
- end);
- checksumOut.write(checksumBuf.array(), offset, checksumLen);
+ checksumBuf.position() + skip;
+ final int end = offset + checksumLen - skip;
+ // If offset > end, there is no more checksum to write.
+ // I.e. a partial chunk checksum rewrite happened and there is no
+ // more to write after that.
+ if (offset > end) {
+ assert crcBytes != null;
+ lastCrc = crcBytes;
+ } else {
+ final int remainingBytes = checksumLen - skip;
+ lastCrc = copyLastChunkChecksum(checksumBuf.array(),
+ checksumSize, end);
+ checksumOut.write(checksumBuf.array(), offset, remainingBytes);
+ }
}
/// flush entire packet, sync if requested