You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/06/23 09:42:13 UTC

hadoop git commit: HDFS-10460. Recompute block checksum for a particular range less than file size on the fly by reconstructing missed block. Contributed by Rakesh R

Repository: hadoop
Updated Branches:
  refs/heads/trunk ff07b1080 -> e6cb07520


HDFS-10460. Recompute block checksum for a particular range less than file size on the fly by reconstructing missed block. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6cb0752
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6cb0752
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6cb0752

Branch: refs/heads/trunk
Commit: e6cb07520f935efde3e881de8f84ee7f6e0a746f
Parents: ff07b10
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Jun 24 17:39:32 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Fri Jun 24 17:39:32 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/FileChecksumHelper.java  |  13 +-
 .../datatransfer/DataTransferProtocol.java      |   5 +-
 .../hdfs/protocol/datatransfer/Sender.java      |   4 +-
 .../src/main/proto/datatransfer.proto           |   1 +
 .../hdfs/protocol/datatransfer/Receiver.java    |   3 +-
 .../server/datanode/BlockChecksumHelper.java    |  81 +++--
 .../hdfs/server/datanode/DataXceiver.java       |   5 +-
 .../StripedBlockChecksumReconstructor.java      |  69 ++++-
 .../apache/hadoop/hdfs/TestFileChecksum.java    | 309 +++++++++++++++++--
 9 files changed, 415 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index c213fa3..fe462f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -454,10 +454,11 @@ final class FileChecksumHelper {
     private boolean checksumBlockGroup(
         LocatedStripedBlock blockGroup) throws IOException {
       ExtendedBlock block = blockGroup.getBlock();
+      long requestedNumBytes = block.getNumBytes();
       if (getRemaining() < block.getNumBytes()) {
-        block.setNumBytes(getRemaining());
+        requestedNumBytes = getRemaining();
       }
-      setRemaining(getRemaining() - block.getNumBytes());
+      setRemaining(getRemaining() - requestedNumBytes);
 
       StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
           blockGroup.getLocations(), blockGroup.getBlockTokens(),
@@ -468,7 +469,8 @@ final class FileChecksumHelper {
       boolean done = false;
       for (int j = 0; !done && j < datanodes.length; j++) {
         try {
-          tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
+          tryDatanode(blockGroup, stripedBlockInfo, datanodes[j],
+              requestedNumBytes);
           done = true;
         } catch (InvalidBlockTokenException ibte) {
           if (bgIdx > getLastRetriedIndex()) {
@@ -496,7 +498,8 @@ final class FileChecksumHelper {
      */
     private void tryDatanode(LocatedStripedBlock blockGroup,
                              StripedBlockInfo stripedBlockInfo,
-                             DatanodeInfo datanode) throws IOException {
+                             DatanodeInfo datanode,
+                             long requestedNumBytes) throws IOException {
 
       try (IOStreamPair pair = getClient().connectToDN(datanode,
           getTimeout(), blockGroup.getBlockToken())) {
@@ -506,7 +509,7 @@ final class FileChecksumHelper {
 
         // get block MD5
         createSender(pair).blockGroupChecksum(stripedBlockInfo,
-            blockGroup.getBlockToken());
+            blockGroup.getBlockToken(), requestedNumBytes);
 
         BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
             PBHelperClient.vintPrefixed(pair.in));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index ad3f2ad..94f8906 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -207,8 +207,11 @@ public interface DataTransferProtocol {
    *
    * @param stripedBlockInfo a striped block info.
    * @param blockToken security token for accessing the block.
+   * @param requestedNumBytes requested number of bytes in the block group
+   *                          to compute the checksum.
    * @throws IOException
    */
   void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
-          Token<BlockTokenIdentifier> blockToken) throws IOException;
+          Token<BlockTokenIdentifier> blockToken,
+          long requestedNumBytes) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index bc73bfc..e133975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -266,7 +266,8 @@ public class Sender implements DataTransferProtocol {
 
   @Override
   public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
-         Token<BlockTokenIdentifier> blockToken) throws IOException {
+      Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
+          throws IOException {
     OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(
             stripedBlockInfo.getBlock(), blockToken))
@@ -278,6 +279,7 @@ public class Sender implements DataTransferProtocol {
             .convertBlockIndices(stripedBlockInfo.getBlockIndices()))
         .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
             stripedBlockInfo.getErasureCodingPolicy()))
+        .setRequestedNumBytes(requestedNumBytes)
         .build();
 
     send(out, Op.BLOCK_GROUP_CHECKSUM, proto);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 1407351..290b158 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -155,6 +155,7 @@ message OpBlockGroupChecksumProto {
   repeated hadoop.common.TokenProto blockTokens = 3;
   required ErasureCodingPolicyProto ecPolicy = 4;
   repeated uint32 blockIndices = 5;
+  required uint64 requestedNumBytes = 6;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 8b863f7..08ab967 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -312,7 +312,8 @@ public abstract class Receiver implements DataTransferProtocol {
 
     try {
       blockGroupChecksum(stripedBlockInfo,
-          PBHelperClient.convert(proto.getHeader().getToken()));
+          PBHelperClient.convert(proto.getHeader().getToken()),
+          proto.getRequestedNumBytes());
     } finally {
       if (traceScope != null) {
         traceScope.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index ec6bbb6..f549785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -285,10 +285,8 @@ final class BlockChecksumHelper {
         }
         setOutBytes(md5out.getDigest());
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC()
-              + ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out);
-        }
+        LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
+            getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
       } finally {
         IOUtils.closeStream(getChecksumIn());
         IOUtils.closeStream(getMetadataIn());
@@ -335,11 +333,13 @@ final class BlockChecksumHelper {
     private final DatanodeInfo[] datanodes;
     private final Token<BlockTokenIdentifier>[] blockTokens;
     private final byte[] blockIndices;
+    private final long requestedNumBytes;
 
     private final DataOutputBuffer md5writer = new DataOutputBuffer();
 
     BlockGroupNonStripedChecksumComputer(DataNode datanode,
-                                         StripedBlockInfo stripedBlockInfo)
+                                         StripedBlockInfo stripedBlockInfo,
+                                         long requestedNumBytes)
         throws IOException {
       super(datanode);
       this.blockGroup = stripedBlockInfo.getBlock();
@@ -347,6 +347,7 @@ final class BlockChecksumHelper {
       this.datanodes = stripedBlockInfo.getDatanodes();
       this.blockTokens = stripedBlockInfo.getBlockTokens();
       this.blockIndices = stripedBlockInfo.getBlockIndices();
+      this.requestedNumBytes = requestedNumBytes;
     }
 
     private static class LiveBlockInfo {
@@ -380,24 +381,29 @@ final class BlockChecksumHelper {
         liveDns.put(blockIndices[idx],
             new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
       }
+      long checksumLen = 0;
       for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
         try {
+          ExtendedBlock block = getInternalBlock(numDataUnits, idx);
+
           LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
           if (liveBlkInfo == null) {
             // reconstruct block and calculate checksum for missing node
-            recalculateChecksum(idx);
+            recalculateChecksum(idx, block.getNumBytes());
           } else {
             try {
-              ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
-                  blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
               checksumBlock(block, idx, liveBlkInfo.getToken(),
                   liveBlkInfo.getDn());
             } catch (IOException ioe) {
               LOG.warn("Exception while reading checksum", ioe);
               // reconstruct block and calculate checksum for the failed node
-              recalculateChecksum(idx);
+              recalculateChecksum(idx, block.getNumBytes());
             }
           }
+          checksumLen += block.getNumBytes();
+          if (checksumLen >= requestedNumBytes) {
+            break; // done with the computation, simply return.
+          }
         } catch (IOException e) {
           LOG.warn("Failed to get the checksum", e);
         }
@@ -407,6 +413,20 @@ final class BlockChecksumHelper {
       setOutBytes(md5out.getDigest());
     }
 
+    private ExtendedBlock getInternalBlock(int numDataUnits, int idx) {
+      // Sets requested number of bytes in blockGroup which is required to
+      // construct the internal block for computing checksum.
+      long actualNumBytes = blockGroup.getNumBytes();
+      blockGroup.setNumBytes(requestedNumBytes);
+
+      ExtendedBlock block = StripedBlockUtil.constructInternalBlock(blockGroup,
+          ecPolicy.getCellSize(), numDataUnits, idx);
+
+      // Set back actualNumBytes value in blockGroup.
+      blockGroup.setNumBytes(actualNumBytes);
+      return block;
+    }
+
     private void checksumBlock(ExtendedBlock block, int blockIdx,
                                Token<BlockTokenIdentifier> blockToken,
                                DatanodeInfo targetDatanode) throws IOException {
@@ -446,9 +466,7 @@ final class BlockChecksumHelper {
         //read md5
         final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
         md5.write(md5writer);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
-        }
+        LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5);
       }
     }
 
@@ -456,34 +474,35 @@ final class BlockChecksumHelper {
      * Reconstruct this data block and recalculate checksum.
      *
      * @param errBlkIndex
-     *          error index to be reconstrcuted and recalculate checksum.
+     *          error index to be reconstructed and recalculate checksum.
+     * @param blockLength
+     *          number of bytes in the block to compute checksum.
      * @throws IOException
      */
-    private void recalculateChecksum(int errBlkIndex) throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Recalculate checksum for the missing/failed block index "
-            + errBlkIndex);
-      }
+    private void recalculateChecksum(int errBlkIndex, long blockLength)
+        throws IOException {
+      LOG.debug("Recalculate checksum for the missing/failed block index {}",
+          errBlkIndex);
       byte[] errIndices = new byte[1];
       errIndices[0] = (byte) errBlkIndex;
+
       StripedReconstructionInfo stripedReconInfo =
           new StripedReconstructionInfo(
-          blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
+              blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
       final StripedBlockChecksumReconstructor checksumRecon =
           new StripedBlockChecksumReconstructor(
-          getDatanode().getErasureCodingWorker(), stripedReconInfo,
-          md5writer);
+              getDatanode().getErasureCodingWorker(), stripedReconInfo,
+              md5writer, blockLength);
       checksumRecon.reconstruct();
 
       DataChecksum checksum = checksumRecon.getChecksum();
       long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
           : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
-      setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
-          crcPerBlock, checksum.getChecksumType());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Recalculated checksum for the block index " + errBlkIndex
-            + ": md5=" + checksumRecon.getMD5());
-      }
+      setOrVerifyChecksumProperties(errBlkIndex,
+          checksum.getBytesPerChecksum(), crcPerBlock,
+          checksum.getChecksumType());
+      LOG.debug("Recalculated checksum for the block index:{}, md5={}",
+          errBlkIndex, checksumRecon.getMD5());
     }
 
     private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
@@ -509,11 +528,9 @@ final class BlockChecksumHelper {
         setCrcType(DataChecksum.Type.MIXED);
       }
 
-      if (LOG.isDebugEnabled()) {
-        if (blockIdx == 0) {
-          LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
-              + ", crcPerBlock=" + getCrcPerBlock());
-        }
+      if (blockIdx == 0) {
+        LOG.debug("set bytesPerCRC={}, crcPerBlock={}", getBytesPerCRC(),
+            getCrcPerBlock());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 829badd..9236a19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -964,7 +964,7 @@ class DataXceiver extends Receiver implements Runnable {
 
   @Override
   public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
-                                 final Token<BlockTokenIdentifier> blockToken)
+      final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
       throws IOException {
     updateCurrentThreadName("Getting checksum for block group" +
         stripedBlockInfo.getBlock());
@@ -973,7 +973,8 @@ class DataXceiver extends Receiver implements Runnable {
         Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
 
     AbstractBlockChecksumComputer maker =
-        new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
+        new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
+            requestedNumBytes);
 
     try {
       maker.compute();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index 1b6758b..c7294c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -41,14 +42,17 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
   private DataOutputBuffer checksumWriter;
   private MD5Hash md5;
   private long checksumDataLen;
+  private long requestedLen;
 
   public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
       StripedReconstructionInfo stripedReconInfo,
-      DataOutputBuffer checksumWriter) throws IOException {
+      DataOutputBuffer checksumWriter,
+      long requestedBlockLength) throws IOException {
     super(worker, stripedReconInfo);
     this.targetIndices = stripedReconInfo.getTargetIndices();
     assert targetIndices != null;
     this.checksumWriter = checksumWriter;
+    this.requestedLen = requestedBlockLength;
     init();
   }
 
@@ -69,8 +73,9 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
 
   public void reconstruct() throws IOException {
     MessageDigest digester = MD5Hash.getDigester();
-    while (getPositionInBlock() < getMaxTargetLength()) {
-      long remaining = getMaxTargetLength() - getPositionInBlock();
+    long maxTargetLength = getMaxTargetLength();
+    while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
+      long remaining = maxTargetLength - getPositionInBlock();
       final int toReconstructLen = (int) Math
           .min(getStripedReader().getBufferSize(), remaining);
       // step1: read from minimum source DNs required for reconstruction.
@@ -81,13 +86,11 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
       reconstructTargets(toReconstructLen);
 
       // step3: calculate checksum
-      getChecksum().calculateChunkedSums(targetBuffer.array(), 0,
-          targetBuffer.remaining(), checksumBuf, 0);
+      checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
+          toReconstructLen, digester);
 
-      // step4: updates the digest using the checksum array of bytes
-      digester.update(checksumBuf, 0, checksumBuf.length);
-      checksumDataLen += checksumBuf.length;
       updatePositionInBlock(toReconstructLen);
+      requestedLen -= toReconstructLen;
       clearBuffers();
     }
 
@@ -96,6 +99,56 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
     md5.write(checksumWriter);
   }
 
+  private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
+      MessageDigest digester) throws IOException {
+    long checksumDataLength = 0;
+    // Calculate partial block checksum. There are two cases.
+    // case-1) length of data bytes which is fraction of bytesPerCRC
+    // case-2) length of data bytes which is less than bytesPerCRC
+    if (requestedLen <= toReconstructLen) {
+      int remainingLen = (int) requestedLen;
+      outputData = Arrays.copyOf(targetBuffer.array(), remainingLen);
+
+      int partialLength = remainingLen % getChecksum().getBytesPerChecksum();
+
+      int checksumRemaining = (remainingLen
+          / getChecksum().getBytesPerChecksum())
+          * getChecksum().getChecksumSize();
+
+      int dataOffset = 0;
+
+      // case-1) length of data bytes which is fraction of bytesPerCRC
+      if (checksumRemaining > 0) {
+        remainingLen = remainingLen - partialLength;
+        checksumBuf = new byte[checksumRemaining];
+        getChecksum().calculateChunkedSums(outputData, dataOffset,
+            remainingLen, checksumBuf, 0);
+        digester.update(checksumBuf, 0, checksumBuf.length);
+        checksumDataLength = checksumBuf.length;
+        dataOffset = remainingLen;
+      }
+
+      // case-2) length of data bytes which is less than bytesPerCRC
+      if (partialLength > 0) {
+        byte[] partialCrc = new byte[getChecksum().getChecksumSize()];
+        getChecksum().update(outputData, dataOffset, partialLength);
+        getChecksum().writeValue(partialCrc, 0, true);
+        digester.update(partialCrc);
+        checksumDataLength += partialCrc.length;
+      }
+
+      clearBuffers();
+      // calculated checksum for the requested length, return checksum length.
+      return checksumDataLength;
+    }
+    getChecksum().calculateChunkedSums(outputData, 0,
+        outputData.length, checksumBuf, 0);
+
+    // updates digest using the checksum array of bytes
+    digester.update(checksumBuf, 0, checksumBuf.length);
+    return checksumBuf.length;
+  }
+
   private void reconstructTargets(int toReconstructLen) {
     initDecoderIfNecessary();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
index 3bee6be..908ab0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.Path;
@@ -31,6 +29,9 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 
 import java.io.IOException;
 
@@ -41,7 +42,8 @@ import java.io.IOException;
  * are the same.
  */
 public class TestFileChecksum {
-  public static final Log LOG = LogFactory.getLog(TestFileChecksum.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestFileChecksum.class);
 
   private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
   private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
@@ -58,6 +60,7 @@ public class TestFileChecksum {
   private int stripSize = cellSize * dataBlocks;
   private int blockGroupSize = stripesPerBlock * stripSize;
   private int fileSize = numBlockGroups * blockGroupSize;
+  private int bytesPerCRC;
 
   private String ecDir = "/striped";
   private String stripedFile1 = ecDir + "/stripedFileChecksum1";
@@ -79,10 +82,9 @@ public class TestFileChecksum {
     fs = cluster.getFileSystem();
     client = fs.getClient();
 
-    prepareTestFiles();
-
-    getDataNodeToKill(stripedFile1);
-    getDataNodeToKill(replicatedFile);
+    bytesPerCRC = conf.getInt(
+        HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+        HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
   }
 
   @After
@@ -93,49 +95,57 @@ public class TestFileChecksum {
     }
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum1() throws Exception {
     int length = 0;
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, length + 10);
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum2() throws Exception {
     int length = stripSize - 1;
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, length - 10);
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum3() throws Exception {
     int length = stripSize;
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, length - 10);
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum4() throws Exception {
     int length = stripSize + cellSize * 2;
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, length - 10);
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum5() throws Exception {
     int length = blockGroupSize;
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, length - 10);
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum6() throws Exception {
     int length = blockGroupSize + blockSize;
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, length - 10);
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksum7() throws Exception {
     int length = -1; // whole file
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     testStripedFileChecksum(length, fileSize);
   }
 
-  void testStripedFileChecksum(int range1, int range2) throws Exception {
+  private void testStripedFileChecksum(int range1, int range2)
+      throws Exception {
     FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
         range1, false);
     FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2,
@@ -153,8 +163,9 @@ public class TestFileChecksum {
     }
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedAndReplicatedFileChecksum() throws Exception {
+    prepareTestFiles(fileSize, new String[] {stripedFile1, replicatedFile});
     FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
         10, false);
     FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
@@ -163,8 +174,9 @@ public class TestFileChecksum {
     Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
+    prepareTestFiles(fileSize, new String[] {stripedFile1});
     FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize,
         false);
     FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1,
@@ -177,8 +189,9 @@ public class TestFileChecksum {
         stripedFileChecksum1.equals(stripedFileChecksumRecon));
   }
 
-  @Test
+  @Test(timeout = 90000)
   public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
+    prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
     FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1,
         false);
     FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1,
@@ -198,6 +211,255 @@ public class TestFileChecksum {
         stripedFileChecksum2.equals(stripedFileChecksum2Recon));
   }
 
+  private void testStripedFileChecksumWithMissedDataBlocksRangeQuery(
+      String stripedFile, int requestedLen) throws Exception {
+    LOG.info("Checksum file:{}, requested length:{}", stripedFile,
+        requestedLen);
+    prepareTestFiles(fileSize, new String[] {stripedFile});
+    FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile,
+        requestedLen, false);
+    FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile,
+        requestedLen, true);
+
+    LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+    LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
+
+    Assert.assertTrue("Checksum mismatches!",
+        stripedFileChecksum1.equals(stripedFileChecksumRecon));
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for a small file less than
+   * bytesPerCRC size.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery1()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for a small file less than
+   * bytesPerCRC size.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery2()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 10);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving bytesPerCRC
+   * length of file range for checksum calculation. 512 is the value of
+   * bytesPerCRC.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery3()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        bytesPerCRC);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving 'cellsize'
+   * length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery4()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        cellSize);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving less than
+   * cellsize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery5()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        cellSize - 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving greater than
+   * cellsize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery6()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        cellSize + 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving two times
+   * cellsize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery7()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        cellSize * 2);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving stripSize
+   * length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery8()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        stripSize);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving less than
+   * stripSize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery9()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        stripSize - 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving greater than
+   * stripSize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery10()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        stripSize + 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving less than
+   * blockGroupSize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery11()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        blockGroupSize - 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving greaterthan
+   * blockGroupSize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery12()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        blockGroupSize + 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving greater than
+   * blockGroupSize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery13()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        blockGroupSize * numBlockGroups / 2);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed by giving lessthan
+   * fileSize length of file range for checksum calculation.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery14()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        fileSize - 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for a length greater than
+   * file size.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery15()
+      throws Exception {
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+        fileSize * 2);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for a small file less than
+   * bytesPerCRC size.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery16()
+      throws Exception {
+    int fileLength = 100;
+    String stripedFile3 = ecDir + "/stripedFileChecksum3";
+    prepareTestFiles(fileLength, new String[] {stripedFile3});
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
+        fileLength - 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for a small file less than
+   * bytesPerCRC size.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery17()
+      throws Exception {
+    int fileLength = 100;
+    String stripedFile3 = ecDir + "/stripedFileChecksum3";
+    prepareTestFiles(fileLength, new String[] {stripedFile3});
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 1);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for a small file less than
+   * bytesPerCRC size.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery18()
+      throws Exception {
+    int fileLength = 100;
+    String stripedFile3 = ecDir + "/stripedFileChecksum3";
+    prepareTestFiles(fileLength, new String[] {stripedFile3});
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 10);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed with greater than file
+   * length.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery19()
+      throws Exception {
+    int fileLength = 100;
+    String stripedFile3 = ecDir + "/stripedFileChecksum3";
+    prepareTestFiles(fileLength, new String[] {stripedFile3});
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
+        fileLength * 2);
+  }
+
+  /**
+   * Test to verify that the checksum can be computed for small file with less
+   * than file length.
+   */
+  @Test(timeout = 90000)
+  public void testStripedFileChecksumWithMissedDataBlocksRangeQuery20()
+      throws Exception {
+    int fileLength = bytesPerCRC;
+    String stripedFile3 = ecDir + "/stripedFileChecksum3";
+    prepareTestFiles(fileLength, new String[] {stripedFile3});
+    testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
+        bytesPerCRC - 1);
+  }
+
   private FileChecksum getFileChecksum(String filePath, int range,
                                        boolean killDn) throws Exception {
     int dnIdxToDie = -1;
@@ -223,12 +485,9 @@ public class TestFileChecksum {
     return fc;
   }
 
-  void prepareTestFiles() throws IOException {
-    byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
-
-    String[] filePaths = new String[] {
-        stripedFile1, stripedFile2, replicatedFile
-    };
+  private void prepareTestFiles(int fileLength, String[] filePaths)
+      throws IOException {
+    byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
 
     for (String filePath : filePaths) {
       Path testPath = new Path(filePath);
@@ -267,4 +526,4 @@ public class TestFileChecksum {
 
     return -1;
   }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org