You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2015/06/16 22:41:32 UTC

hadoop git commit: HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee.

Repository: hadoop
Updated Branches:
  refs/heads/trunk b578807b9 -> c74517c46


HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c74517c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c74517c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c74517c4

Branch: refs/heads/trunk
Commit: c74517c46bf00af408ed866b6577623cdec02de1
Parents: b578807
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Jun 16 15:39:46 2015 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Jun 16 15:40:16 2015 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hdfs/server/datanode/BlockReceiver.java     | 126 ++++++++++++++-----
 2 files changed, 96 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c74517c4/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 42588cc..b921f2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1026,6 +1026,8 @@ Release 2.7.1 - UNRELEASED
 
     HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao)
 
+    HDFS-4660. Block corruption can happen during pipeline recovery (kihwal)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c74517c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index c46892d..2468f43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -588,29 +588,59 @@ class BlockReceiver implements Closeable {
       try {
         long onDiskLen = replicaInfo.getBytesOnDisk();
         if (onDiskLen<offsetInBlock) {
-          //finally write to the disk :
-          
-          if (onDiskLen % bytesPerChecksum != 0) { 
-            // prepare to overwrite last checksum
-            adjustCrcFilePosition();
+          // Normally the beginning of an incoming packet is aligned with the
+          // existing data on disk. If the beginning packet data offset is not
+          // checksum chunk aligned, the end of packet will not go beyond the
+          // next chunk boundary.
+          // When a failure-recovery is involved, the client state and the
+          // the datanode state may not exactly agree. I.e. the client may
+          // resend part of data that is already on disk. Correct number of
+          // bytes should be skipped when writing the data and checksum
+          // buffers out to disk.
+          long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum;
+          boolean alignedOnDisk = partialChunkSizeOnDisk == 0;
+          boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0;
+
+          // Since data is always appended, not overwritten, partial CRC
+          // recalculation is necessary if the on-disk data is not chunk-
+          // aligned, regardless of whether the beginning of the data in
+          // the packet is chunk-aligned.
+          boolean doPartialCrc = !alignedOnDisk && !shouldNotWriteChecksum;
+
+          // If this is a partial chunk, then verify that this is the only
+          // chunk in the packet. If the starting offset is not chunk
+          // aligned, the packet should terminate at or before the next
+          // chunk boundary.
+          if (!alignedInPacket && len > bytesPerChecksum) {
+            throw new IOException("Unexpected packet data length for "
+                +  block + " from " + inAddr + ": a partial chunk must be "
+                + " sent in an individual packet (data length = " + len
+                +  " > bytesPerChecksum = " + bytesPerChecksum + ")");
           }
-          
-          // If this is a partial chunk, then read in pre-existing checksum
+
+          // If the last portion of the block file is not a full chunk,
+          // then read in pre-existing partial data chunk and recalculate
+          // the checksum so that the checksum calculation can continue
+          // from the right state.
           Checksum partialCrc = null;
-          if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
+          if (doPartialCrc) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("receivePacket for " + block 
-                  + ": bytesPerChecksum=" + bytesPerChecksum                  
-                  + " does not divide firstByteInBlock=" + firstByteInBlock);
+                  + ": previous write did not end at the chunk boundary."
+                  + " onDiskLen=" + onDiskLen);
             }
             long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                 onDiskLen / bytesPerChecksum * checksumSize;
             partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
           }
 
+          // The data buffer position where write will begin. If the packet
+          // data and on-disk data have no overlap, this will not be at the
+          // beginning of the buffer.
           int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
               + dataBuf.arrayOffset() + dataBuf.position();
 
+          // Actual number of data bytes to write.
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
           
           // Write data to disk.
@@ -625,31 +655,63 @@ class BlockReceiver implements Closeable {
           final byte[] lastCrc;
           if (shouldNotWriteChecksum) {
             lastCrc = null;
-          } else if (partialCrc != null) {
-            // If this is a partial chunk, then verify that this is the only
-            // chunk in the packet. Calculate new crc for this chunk.
-            if (len > bytesPerChecksum) {
-              throw new IOException("Unexpected packet data length for "
-                  +  block + " from " + inAddr + ": a partial chunk must be "
-                  + " sent in an individual packet (data length = " + len
-                  +  " > bytesPerChecksum = " + bytesPerChecksum + ")");
+          } else {
+            int skip = 0;
+            byte[] crcBytes = null;
+
+            // First, overwrite the partial crc at the end, if necessary.
+            if (doPartialCrc) { // not chunk-aligned on disk
+              // Calculate new crc for this chunk.
+              int bytesToReadForRecalc =
+                  (int)(bytesPerChecksum - partialChunkSizeOnDisk);
+              if (numBytesToDisk < bytesToReadForRecalc) {
+                bytesToReadForRecalc = numBytesToDisk;
+              }
+
+              partialCrc.update(dataBuf.array(), startByteToDisk,
+                  bytesToReadForRecalc);
+              byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
+                  checksumSize);
+              crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length);
+              // prepare to overwrite last checksum
+              adjustCrcFilePosition();
+              checksumOut.write(buf);
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("Writing out partial crc for data len " + len +
+                    ", skip=" + skip);
+              }
+              skip++; //  For the partial chunk that was just read.
             }
-            partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
-            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
-            lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
-            checksumOut.write(buf);
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Writing out partial crc for data len " + len);
+
+            // Determine how many checksums need to be skipped up to the last
+            // boundary. The checksum after the boundary was already counted
+            // above. Only count the number of checksums skipped up to the
+            // boundary here.
+            long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum);
+            long skippedDataBytes = lastChunkBoundary - firstByteInBlock;
+
+            if (skippedDataBytes > 0) {
+              skip += (int)(skippedDataBytes / bytesPerChecksum) +
+                  ((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1);
             }
-            partialCrc = null;
-          } else {
-            // write checksum
+            skip *= checksumSize; // Convert to number of bytes
+
+            // write the rest of checksum
             final int offset = checksumBuf.arrayOffset() +
-                checksumBuf.position();
-            final int end = offset + checksumLen;
-            lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
-                end);
-            checksumOut.write(checksumBuf.array(), offset, checksumLen);
+                checksumBuf.position() + skip;
+            final int end = offset + checksumLen - skip;
+            // If offset > end, there is no more checksum to write.
+            // I.e. a partial chunk checksum rewrite happened and there is no
+            // more to write after that.
+            if (offset > end) {
+              assert crcBytes != null;
+              lastCrc = crcBytes;
+            } else {
+              final int remainingBytes = checksumLen - skip;
+              lastCrc = copyLastChunkChecksum(checksumBuf.array(),
+                  checksumSize, end);
+              checksumOut.write(checksumBuf.array(), offset, remainingBytes);
+            }
           }
 
           /// flush entire packet, sync if requested