You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/06/23 09:42:13 UTC
hadoop git commit: HDFS-10460. Recompute block checksum for a
particular range less than file size on the fly by reconstructing missed
block. Contributed by Rakesh R
Repository: hadoop
Updated Branches:
refs/heads/trunk ff07b1080 -> e6cb07520
HDFS-10460. Recompute block checksum for a particular range less than file size on the fly by reconstructing missed block. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6cb0752
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6cb0752
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6cb0752
Branch: refs/heads/trunk
Commit: e6cb07520f935efde3e881de8f84ee7f6e0a746f
Parents: ff07b10
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Jun 24 17:39:32 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Fri Jun 24 17:39:32 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/hdfs/FileChecksumHelper.java | 13 +-
.../datatransfer/DataTransferProtocol.java | 5 +-
.../hdfs/protocol/datatransfer/Sender.java | 4 +-
.../src/main/proto/datatransfer.proto | 1 +
.../hdfs/protocol/datatransfer/Receiver.java | 3 +-
.../server/datanode/BlockChecksumHelper.java | 81 +++--
.../hdfs/server/datanode/DataXceiver.java | 5 +-
.../StripedBlockChecksumReconstructor.java | 69 ++++-
.../apache/hadoop/hdfs/TestFileChecksum.java | 309 +++++++++++++++++--
9 files changed, 415 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index c213fa3..fe462f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -454,10 +454,11 @@ final class FileChecksumHelper {
private boolean checksumBlockGroup(
LocatedStripedBlock blockGroup) throws IOException {
ExtendedBlock block = blockGroup.getBlock();
+ long requestedNumBytes = block.getNumBytes();
if (getRemaining() < block.getNumBytes()) {
- block.setNumBytes(getRemaining());
+ requestedNumBytes = getRemaining();
}
- setRemaining(getRemaining() - block.getNumBytes());
+ setRemaining(getRemaining() - requestedNumBytes);
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
blockGroup.getLocations(), blockGroup.getBlockTokens(),
@@ -468,7 +469,8 @@ final class FileChecksumHelper {
boolean done = false;
for (int j = 0; !done && j < datanodes.length; j++) {
try {
- tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
+ tryDatanode(blockGroup, stripedBlockInfo, datanodes[j],
+ requestedNumBytes);
done = true;
} catch (InvalidBlockTokenException ibte) {
if (bgIdx > getLastRetriedIndex()) {
@@ -496,7 +498,8 @@ final class FileChecksumHelper {
*/
private void tryDatanode(LocatedStripedBlock blockGroup,
StripedBlockInfo stripedBlockInfo,
- DatanodeInfo datanode) throws IOException {
+ DatanodeInfo datanode,
+ long requestedNumBytes) throws IOException {
try (IOStreamPair pair = getClient().connectToDN(datanode,
getTimeout(), blockGroup.getBlockToken())) {
@@ -506,7 +509,7 @@ final class FileChecksumHelper {
// get block MD5
createSender(pair).blockGroupChecksum(stripedBlockInfo,
- blockGroup.getBlockToken());
+ blockGroup.getBlockToken(), requestedNumBytes);
BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(pair.in));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index ad3f2ad..94f8906 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -207,8 +207,11 @@ public interface DataTransferProtocol {
*
* @param stripedBlockInfo a striped block info.
* @param blockToken security token for accessing the block.
+ * @param requestedNumBytes requested number of bytes in the block group
+ * to compute the checksum.
* @throws IOException
*/
void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
+ Token<BlockTokenIdentifier> blockToken,
+ long requestedNumBytes) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index bc73bfc..e133975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -266,7 +266,8 @@ public class Sender implements DataTransferProtocol {
@Override
public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
+ throws IOException {
OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(
stripedBlockInfo.getBlock(), blockToken))
@@ -278,6 +279,7 @@ public class Sender implements DataTransferProtocol {
.convertBlockIndices(stripedBlockInfo.getBlockIndices()))
.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
stripedBlockInfo.getErasureCodingPolicy()))
+ .setRequestedNumBytes(requestedNumBytes)
.build();
send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 1407351..290b158 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -155,6 +155,7 @@ message OpBlockGroupChecksumProto {
repeated hadoop.common.TokenProto blockTokens = 3;
required ErasureCodingPolicyProto ecPolicy = 4;
repeated uint32 blockIndices = 5;
+ required uint64 requestedNumBytes = 6;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 8b863f7..08ab967 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -312,7 +312,8 @@ public abstract class Receiver implements DataTransferProtocol {
try {
blockGroupChecksum(stripedBlockInfo,
- PBHelperClient.convert(proto.getHeader().getToken()));
+ PBHelperClient.convert(proto.getHeader().getToken()),
+ proto.getRequestedNumBytes());
} finally {
if (traceScope != null) {
traceScope.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index ec6bbb6..f549785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -285,10 +285,8 @@ final class BlockChecksumHelper {
}
setOutBytes(md5out.getDigest());
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC()
- + ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out);
- }
+ LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
+ getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
} finally {
IOUtils.closeStream(getChecksumIn());
IOUtils.closeStream(getMetadataIn());
@@ -335,11 +333,13 @@ final class BlockChecksumHelper {
private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens;
private final byte[] blockIndices;
+ private final long requestedNumBytes;
private final DataOutputBuffer md5writer = new DataOutputBuffer();
BlockGroupNonStripedChecksumComputer(DataNode datanode,
- StripedBlockInfo stripedBlockInfo)
+ StripedBlockInfo stripedBlockInfo,
+ long requestedNumBytes)
throws IOException {
super(datanode);
this.blockGroup = stripedBlockInfo.getBlock();
@@ -347,6 +347,7 @@ final class BlockChecksumHelper {
this.datanodes = stripedBlockInfo.getDatanodes();
this.blockTokens = stripedBlockInfo.getBlockTokens();
this.blockIndices = stripedBlockInfo.getBlockIndices();
+ this.requestedNumBytes = requestedNumBytes;
}
private static class LiveBlockInfo {
@@ -380,24 +381,29 @@ final class BlockChecksumHelper {
liveDns.put(blockIndices[idx],
new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
}
+ long checksumLen = 0;
for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
try {
+ ExtendedBlock block = getInternalBlock(numDataUnits, idx);
+
LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
if (liveBlkInfo == null) {
// reconstruct block and calculate checksum for missing node
- recalculateChecksum(idx);
+ recalculateChecksum(idx, block.getNumBytes());
} else {
try {
- ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
- blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
checksumBlock(block, idx, liveBlkInfo.getToken(),
liveBlkInfo.getDn());
} catch (IOException ioe) {
LOG.warn("Exception while reading checksum", ioe);
// reconstruct block and calculate checksum for the failed node
- recalculateChecksum(idx);
+ recalculateChecksum(idx, block.getNumBytes());
}
}
+ checksumLen += block.getNumBytes();
+ if (checksumLen >= requestedNumBytes) {
+ break; // done with the computation, simply return.
+ }
} catch (IOException e) {
LOG.warn("Failed to get the checksum", e);
}
@@ -407,6 +413,20 @@ final class BlockChecksumHelper {
setOutBytes(md5out.getDigest());
}
+ private ExtendedBlock getInternalBlock(int numDataUnits, int idx) {
+ // Sets requested number of bytes in blockGroup which is required to
+ // construct the internal block for computing checksum.
+ long actualNumBytes = blockGroup.getNumBytes();
+ blockGroup.setNumBytes(requestedNumBytes);
+
+ ExtendedBlock block = StripedBlockUtil.constructInternalBlock(blockGroup,
+ ecPolicy.getCellSize(), numDataUnits, idx);
+
+ // Set back actualNumBytes value in blockGroup.
+ blockGroup.setNumBytes(actualNumBytes);
+ return block;
+ }
+
private void checksumBlock(ExtendedBlock block, int blockIdx,
Token<BlockTokenIdentifier> blockToken,
DatanodeInfo targetDatanode) throws IOException {
@@ -446,9 +466,7 @@ final class BlockChecksumHelper {
//read md5
final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
md5.write(md5writer);
- if (LOG.isDebugEnabled()) {
- LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
- }
+ LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5);
}
}
@@ -456,34 +474,35 @@ final class BlockChecksumHelper {
* Reconstruct this data block and recalculate checksum.
*
* @param errBlkIndex
- * error index to be reconstrcuted and recalculate checksum.
+ * error index to be reconstructed and recalculate checksum.
+ * @param blockLength
+ * number of bytes in the block to compute checksum.
* @throws IOException
*/
- private void recalculateChecksum(int errBlkIndex) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recalculate checksum for the missing/failed block index "
- + errBlkIndex);
- }
+ private void recalculateChecksum(int errBlkIndex, long blockLength)
+ throws IOException {
+ LOG.debug("Recalculate checksum for the missing/failed block index {}",
+ errBlkIndex);
byte[] errIndices = new byte[1];
errIndices[0] = (byte) errBlkIndex;
+
StripedReconstructionInfo stripedReconInfo =
new StripedReconstructionInfo(
- blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
+ blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
final StripedBlockChecksumReconstructor checksumRecon =
new StripedBlockChecksumReconstructor(
- getDatanode().getErasureCodingWorker(), stripedReconInfo,
- md5writer);
+ getDatanode().getErasureCodingWorker(), stripedReconInfo,
+ md5writer, blockLength);
checksumRecon.reconstruct();
DataChecksum checksum = checksumRecon.getChecksum();
long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
: checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
- setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
- crcPerBlock, checksum.getChecksumType());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recalculated checksum for the block index " + errBlkIndex
- + ": md5=" + checksumRecon.getMD5());
- }
+ setOrVerifyChecksumProperties(errBlkIndex,
+ checksum.getBytesPerChecksum(), crcPerBlock,
+ checksum.getChecksumType());
+ LOG.debug("Recalculated checksum for the block index:{}, md5={}",
+ errBlkIndex, checksumRecon.getMD5());
}
private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
@@ -509,11 +528,9 @@ final class BlockChecksumHelper {
setCrcType(DataChecksum.Type.MIXED);
}
- if (LOG.isDebugEnabled()) {
- if (blockIdx == 0) {
- LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
- + ", crcPerBlock=" + getCrcPerBlock());
- }
+ if (blockIdx == 0) {
+ LOG.debug("set bytesPerCRC={}, crcPerBlock={}", getBytesPerCRC(),
+ getCrcPerBlock());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 829badd..9236a19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -964,7 +964,7 @@ class DataXceiver extends Receiver implements Runnable {
@Override
public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
- final Token<BlockTokenIdentifier> blockToken)
+ final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
throws IOException {
updateCurrentThreadName("Getting checksum for block group" +
stripedBlockInfo.getBlock());
@@ -973,7 +973,8 @@ class DataXceiver extends Receiver implements Runnable {
Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
AbstractBlockChecksumComputer maker =
- new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
+ new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
+ requestedNumBytes);
try {
maker.compute();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index 1b6758b..c7294c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -41,14 +42,17 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
private DataOutputBuffer checksumWriter;
private MD5Hash md5;
private long checksumDataLen;
+ private long requestedLen;
public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
StripedReconstructionInfo stripedReconInfo,
- DataOutputBuffer checksumWriter) throws IOException {
+ DataOutputBuffer checksumWriter,
+ long requestedBlockLength) throws IOException {
super(worker, stripedReconInfo);
this.targetIndices = stripedReconInfo.getTargetIndices();
assert targetIndices != null;
this.checksumWriter = checksumWriter;
+ this.requestedLen = requestedBlockLength;
init();
}
@@ -69,8 +73,9 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
public void reconstruct() throws IOException {
MessageDigest digester = MD5Hash.getDigester();
- while (getPositionInBlock() < getMaxTargetLength()) {
- long remaining = getMaxTargetLength() - getPositionInBlock();
+ long maxTargetLength = getMaxTargetLength();
+ while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
+ long remaining = maxTargetLength - getPositionInBlock();
final int toReconstructLen = (int) Math
.min(getStripedReader().getBufferSize(), remaining);
// step1: read from minimum source DNs required for reconstruction.
@@ -81,13 +86,11 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
reconstructTargets(toReconstructLen);
// step3: calculate checksum
- getChecksum().calculateChunkedSums(targetBuffer.array(), 0,
- targetBuffer.remaining(), checksumBuf, 0);
+ checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
+ toReconstructLen, digester);
- // step4: updates the digest using the checksum array of bytes
- digester.update(checksumBuf, 0, checksumBuf.length);
- checksumDataLen += checksumBuf.length;
updatePositionInBlock(toReconstructLen);
+ requestedLen -= toReconstructLen;
clearBuffers();
}
@@ -96,6 +99,56 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
md5.write(checksumWriter);
}
+ private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
+ MessageDigest digester) throws IOException {
+ long checksumDataLength = 0;
+ // Calculate partial block checksum. There are two cases.
+ // case-1) length of data bytes which is fraction of bytesPerCRC
+ // case-2) length of data bytes which is less than bytesPerCRC
+ if (requestedLen <= toReconstructLen) {
+ int remainingLen = (int) requestedLen;
+ outputData = Arrays.copyOf(targetBuffer.array(), remainingLen);
+
+ int partialLength = remainingLen % getChecksum().getBytesPerChecksum();
+
+ int checksumRemaining = (remainingLen
+ / getChecksum().getBytesPerChecksum())
+ * getChecksum().getChecksumSize();
+
+ int dataOffset = 0;
+
+ // case-1) length of data bytes which is fraction of bytesPerCRC
+ if (checksumRemaining > 0) {
+ remainingLen = remainingLen - partialLength;
+ checksumBuf = new byte[checksumRemaining];
+ getChecksum().calculateChunkedSums(outputData, dataOffset,
+ remainingLen, checksumBuf, 0);
+ digester.update(checksumBuf, 0, checksumBuf.length);
+ checksumDataLength = checksumBuf.length;
+ dataOffset = remainingLen;
+ }
+
+ // case-2) length of data bytes which is less than bytesPerCRC
+ if (partialLength > 0) {
+ byte[] partialCrc = new byte[getChecksum().getChecksumSize()];
+ getChecksum().update(outputData, dataOffset, partialLength);
+ getChecksum().writeValue(partialCrc, 0, true);
+ digester.update(partialCrc);
+ checksumDataLength += partialCrc.length;
+ }
+
+ clearBuffers();
+ // calculated checksum for the requested length, return checksum length.
+ return checksumDataLength;
+ }
+ getChecksum().calculateChunkedSums(outputData, 0,
+ outputData.length, checksumBuf, 0);
+
+ // updates digest using the checksum array of bytes
+ digester.update(checksumBuf, 0, checksumBuf.length);
+ return checksumBuf.length;
+ }
+
private void reconstructTargets(int toReconstructLen) {
initDecoderIfNecessary();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cb0752/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
index 3bee6be..908ab0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.Path;
@@ -31,6 +29,9 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import java.io.IOException;
@@ -41,7 +42,8 @@ import java.io.IOException;
* are the same.
*/
public class TestFileChecksum {
- public static final Log LOG = LogFactory.getLog(TestFileChecksum.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestFileChecksum.class);
private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
@@ -58,6 +60,7 @@ public class TestFileChecksum {
private int stripSize = cellSize * dataBlocks;
private int blockGroupSize = stripesPerBlock * stripSize;
private int fileSize = numBlockGroups * blockGroupSize;
+ private int bytesPerCRC;
private String ecDir = "/striped";
private String stripedFile1 = ecDir + "/stripedFileChecksum1";
@@ -79,10 +82,9 @@ public class TestFileChecksum {
fs = cluster.getFileSystem();
client = fs.getClient();
- prepareTestFiles();
-
- getDataNodeToKill(stripedFile1);
- getDataNodeToKill(replicatedFile);
+ bytesPerCRC = conf.getInt(
+ HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
}
@After
@@ -93,49 +95,57 @@ public class TestFileChecksum {
}
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum1() throws Exception {
int length = 0;
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length + 10);
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum2() throws Exception {
int length = stripSize - 1;
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum3() throws Exception {
int length = stripSize;
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum4() throws Exception {
int length = stripSize + cellSize * 2;
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum5() throws Exception {
int length = blockGroupSize;
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum6() throws Exception {
int length = blockGroupSize + blockSize;
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksum7() throws Exception {
int length = -1; // whole file
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, fileSize);
}
- void testStripedFileChecksum(int range1, int range2) throws Exception {
+ private void testStripedFileChecksum(int range1, int range2)
+ throws Exception {
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
range1, false);
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2,
@@ -153,8 +163,9 @@ public class TestFileChecksum {
}
}
- @Test
+ @Test(timeout = 90000)
public void testStripedAndReplicatedFileChecksum() throws Exception {
+ prepareTestFiles(fileSize, new String[] {stripedFile1, replicatedFile});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
10, false);
FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
@@ -163,8 +174,9 @@ public class TestFileChecksum {
Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
+ prepareTestFiles(fileSize, new String[] {stripedFile1});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize,
false);
FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1,
@@ -177,8 +189,9 @@ public class TestFileChecksum {
stripedFileChecksum1.equals(stripedFileChecksumRecon));
}
- @Test
+ @Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
+ prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1,
false);
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1,
@@ -198,6 +211,255 @@ public class TestFileChecksum {
stripedFileChecksum2.equals(stripedFileChecksum2Recon));
}
+ private void testStripedFileChecksumWithMissedDataBlocksRangeQuery(
+ String stripedFile, int requestedLen) throws Exception {
+ LOG.info("Checksum file:{}, requested length:{}", stripedFile,
+ requestedLen);
+ prepareTestFiles(fileSize, new String[] {stripedFile});
+ FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile,
+ requestedLen, false);
+ FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile,
+ requestedLen, true);
+
+ LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+ LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
+
+ Assert.assertTrue("Checksum mismatches!",
+ stripedFileChecksum1.equals(stripedFileChecksumRecon));
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for a small file less than
+ * bytesPerCRC size.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery1()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for a small file less than
+ * bytesPerCRC size.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery2()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 10);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving bytesPerCRC
+ * length of file range for checksum calculation. 512 is the value of
+ * bytesPerCRC.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery3()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ bytesPerCRC);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving 'cellsize'
+ * length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery4()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ cellSize);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving less than
+ * cellsize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery5()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ cellSize - 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving greater than
+ * cellsize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery6()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ cellSize + 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving two times
+ * cellsize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery7()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ cellSize * 2);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving stripSize
+ * length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery8()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ stripSize);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving less than
+ * stripSize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery9()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ stripSize - 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving greater than
+ * stripSize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery10()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ stripSize + 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving less than
+ * blockGroupSize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery11()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ blockGroupSize - 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving greaterthan
+ * blockGroupSize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery12()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ blockGroupSize + 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving greater than
+ * blockGroupSize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery13()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ blockGroupSize * numBlockGroups / 2);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed by giving lessthan
+ * fileSize length of file range for checksum calculation.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery14()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ fileSize - 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for a length greater than
+ * file size.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery15()
+ throws Exception {
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
+ fileSize * 2);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for a small file less than
+ * bytesPerCRC size.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery16()
+ throws Exception {
+ int fileLength = 100;
+ String stripedFile3 = ecDir + "/stripedFileChecksum3";
+ prepareTestFiles(fileLength, new String[] {stripedFile3});
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
+ fileLength - 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for a small file less than
+ * bytesPerCRC size.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery17()
+ throws Exception {
+ int fileLength = 100;
+ String stripedFile3 = ecDir + "/stripedFileChecksum3";
+ prepareTestFiles(fileLength, new String[] {stripedFile3});
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 1);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for a small file less than
+ * bytesPerCRC size.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery18()
+ throws Exception {
+ int fileLength = 100;
+ String stripedFile3 = ecDir + "/stripedFileChecksum3";
+ prepareTestFiles(fileLength, new String[] {stripedFile3});
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 10);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed with greater than file
+ * length.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery19()
+ throws Exception {
+ int fileLength = 100;
+ String stripedFile3 = ecDir + "/stripedFileChecksum3";
+ prepareTestFiles(fileLength, new String[] {stripedFile3});
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
+ fileLength * 2);
+ }
+
+ /**
+ * Test to verify that the checksum can be computed for small file with less
+ * than file length.
+ */
+ @Test(timeout = 90000)
+ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery20()
+ throws Exception {
+ int fileLength = bytesPerCRC;
+ String stripedFile3 = ecDir + "/stripedFileChecksum3";
+ prepareTestFiles(fileLength, new String[] {stripedFile3});
+ testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
+ bytesPerCRC - 1);
+ }
+
private FileChecksum getFileChecksum(String filePath, int range,
boolean killDn) throws Exception {
int dnIdxToDie = -1;
@@ -223,12 +485,9 @@ public class TestFileChecksum {
return fc;
}
- void prepareTestFiles() throws IOException {
- byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
-
- String[] filePaths = new String[] {
- stripedFile1, stripedFile2, replicatedFile
- };
+ private void prepareTestFiles(int fileLength, String[] filePaths)
+ throws IOException {
+ byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
for (String filePath : filePaths) {
Path testPath = new Path(filePath);
@@ -267,4 +526,4 @@ public class TestFileChecksum {
return -1;
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org