You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/06/01 04:58:12 UTC
hadoop git commit: HDFS-9833. Erasure coding: recomputing block
checksum on the fly by reconstructing the missed/corrupt block data.
Contributed by Rakesh R.
Repository: hadoop
Updated Branches:
refs/heads/trunk 8ceb06e23 -> d749cf65e
HDFS-9833. Erasure coding: recomputing block checksum on the fly by reconstructing the missed/corrupt block data. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d749cf65
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d749cf65
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d749cf65
Branch: refs/heads/trunk
Commit: d749cf65e1ab0e0daf5be86931507183f189e855
Parents: 8ceb06e
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Jun 2 12:56:21 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Jun 2 12:56:21 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/hdfs/FileChecksumHelper.java | 3 +-
.../hadoop/hdfs/protocol/StripedBlockInfo.java | 10 +-
.../hdfs/protocol/datatransfer/Sender.java | 2 +
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 16 ++
.../src/main/proto/datatransfer.proto | 1 +
.../hdfs/protocol/datatransfer/Receiver.java | 1 +
.../server/datanode/BlockChecksumHelper.java | 172 ++++++++++++++-----
.../erasurecode/ErasureCodingWorker.java | 15 +-
.../StripedBlockChecksumReconstructor.java | 129 ++++++++++++++
.../erasurecode/StripedBlockReconstructor.java | 119 +++++++++++++
.../datanode/erasurecode/StripedReader.java | 22 +--
.../erasurecode/StripedReconstructionInfo.java | 99 +++++++++++
.../erasurecode/StripedReconstructor.java | 169 +++++++-----------
.../datanode/erasurecode/StripedWriter.java | 29 ++--
.../hdfs/TestDecommissionWithStriped.java | 47 +++++
.../apache/hadoop/hdfs/TestFileChecksum.java | 41 ++++-
16 files changed, 675 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index dfd9393..c213fa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -460,7 +460,8 @@ final class FileChecksumHelper {
setRemaining(getRemaining() - block.getNumBytes());
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
- blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
+ blockGroup.getLocations(), blockGroup.getBlockTokens(),
+ blockGroup.getBlockIndices(), ecPolicy);
DatanodeInfo[] datanodes = blockGroup.getLocations();
//try each datanode in the block group.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
index 74e8081..e46fabc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java
@@ -32,14 +32,16 @@ public class StripedBlockInfo {
private final ExtendedBlock block;
private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens;
+ private final byte[] blockIndices;
private final ErasureCodingPolicy ecPolicy;
public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes,
- Token<BlockTokenIdentifier>[] blockTokens,
- ErasureCodingPolicy ecPolicy) {
+ Token<BlockTokenIdentifier>[] blockTokens, byte[] blockIndices,
+ ErasureCodingPolicy ecPolicy) {
this.block = block;
this.datanodes = datanodes;
this.blockTokens = blockTokens;
+ this.blockIndices = blockIndices;
this.ecPolicy = ecPolicy;
}
@@ -55,6 +57,10 @@ public class StripedBlockInfo {
return blockTokens;
}
+ public byte[] getBlockIndices() {
+ return blockIndices;
+ }
+
public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 585ed99..bc73bfc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -274,6 +274,8 @@ public class Sender implements DataTransferProtocol {
stripedBlockInfo.getDatanodes()))
.addAllBlockTokens(PBHelperClient.convert(
stripedBlockInfo.getBlockTokens()))
+ .addAllBlockIndices(PBHelperClient
+ .convertBlockIndices(stripedBlockInfo.getBlockIndices()))
.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
stripedBlockInfo.getErasureCodingPolicy()))
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index a05567b..d5bb1e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -853,6 +853,22 @@ public class PBHelperClient {
return results;
}
+ public static List<Integer> convertBlockIndices(byte[] blockIndices) {
+ List<Integer> results = new ArrayList<>(blockIndices.length);
+ for (byte bt : blockIndices) {
+ results.add(Integer.valueOf(bt));
+ }
+ return results;
+ }
+
+ public static byte[] convertBlockIndices(List<Integer> blockIndices) {
+ byte[] blkIndices = new byte[blockIndices.size()];
+ for (int i = 0; i < blockIndices.size(); i++) {
+ blkIndices[i] = (byte) blockIndices.get(i).intValue();
+ }
+ return blkIndices;
+ }
+
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
List<StorageTypeProto> cList = proto.getCreationPolicy()
.getStorageTypesList();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 522ee06..1407351 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -154,6 +154,7 @@ message OpBlockGroupChecksumProto {
// each internal block has a block token
repeated hadoop.common.TokenProto blockTokens = 3;
required ErasureCodingPolicyProto ecPolicy = 4;
+ repeated uint32 blockIndices = 5;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index b2f26f8..8b863f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -306,6 +306,7 @@ public abstract class Receiver implements DataTransferProtocol {
PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convert(proto.getDatanodes()),
PBHelperClient.convertTokens(proto.getBlockTokensList()),
+ PBHelperClient.convertBlockIndices(proto.getBlockIndicesList()),
PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index 1f1a25c..ec6bbb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -30,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,11 +50,14 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
+import java.util.HashMap;
+import java.util.Map;
/**
* Utilities for Block checksum computing, for both replicated and striped
* blocks.
*/
+@InterfaceAudience.Private
final class BlockChecksumHelper {
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
@@ -327,6 +334,7 @@ final class BlockChecksumHelper {
private final ErasureCodingPolicy ecPolicy;
private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens;
+ private final byte[] blockIndices;
private final DataOutputBuffer md5writer = new DataOutputBuffer();
@@ -338,17 +346,61 @@ final class BlockChecksumHelper {
this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
this.datanodes = stripedBlockInfo.getDatanodes();
this.blockTokens = stripedBlockInfo.getBlockTokens();
+ this.blockIndices = stripedBlockInfo.getBlockIndices();
+ }
+
+ private static class LiveBlockInfo {
+ private final DatanodeInfo dn;
+ private final Token<BlockTokenIdentifier> token;
+
+ LiveBlockInfo(DatanodeInfo dn, Token<BlockTokenIdentifier> token) {
+ this.dn = dn;
+ this.token = token;
+ }
+
+ DatanodeInfo getDn() {
+ return dn;
+ }
+
+ Token<BlockTokenIdentifier> getToken() {
+ return token;
+ }
}
@Override
void compute() throws IOException {
- for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
- ExtendedBlock block =
- StripedBlockUtil.constructInternalBlock(blockGroup,
- ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
- DatanodeInfo targetDatanode = datanodes[idx];
- Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
- checksumBlock(block, idx, blockToken, targetDatanode);
+ assert datanodes.length == blockIndices.length;
+
+ Map<Byte, LiveBlockInfo> liveDns = new HashMap<>(datanodes.length);
+ int blkIndxLen = blockIndices.length;
+ int numDataUnits = ecPolicy.getNumDataUnits();
+ // Prepare live datanode list. Missing data blocks will be reconstructed
+ // and recalculate checksum.
+ for (int idx = 0; idx < blkIndxLen; idx++) {
+ liveDns.put(blockIndices[idx],
+ new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
+ }
+ for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
+ try {
+ LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
+ if (liveBlkInfo == null) {
+ // reconstruct block and calculate checksum for missing node
+ recalculateChecksum(idx);
+ } else {
+ try {
+ ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
+ blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
+ checksumBlock(block, idx, liveBlkInfo.getToken(),
+ liveBlkInfo.getDn());
+ } catch (IOException ioe) {
+ LOG.warn("Exception while reading checksum", ioe);
+ // reconstruct block and calculate checksum for the failed node
+ recalculateChecksum(idx);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to get the checksum", e);
+ }
}
MD5Hash md5out = MD5Hash.digest(md5writer.getData());
@@ -379,52 +431,90 @@ final class BlockChecksumHelper {
DataTransferProtos.OpBlockChecksumResponseProto checksumData =
reply.getChecksumResponse();
- //read byte-per-checksum
- final int bpc = checksumData.getBytesPerCrc();
- if (blockIdx == 0) { //first block
- setBytesPerCRC(bpc);
- } else if (bpc != getBytesPerCRC()) {
- throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
- + " but bytesPerCRC=" + getBytesPerCRC());
- }
-
- //read crc-per-block
- final long cpb = checksumData.getCrcPerBlock();
- if (blockIdx == 0) {
- setCrcPerBlock(cpb);
- }
-
- //read md5
- final MD5Hash md5 = new MD5Hash(
- checksumData.getMd5().toByteArray());
- md5.write(md5writer);
-
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData.getCrcType());
} else {
- LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
- "inferring checksum by reading first byte");
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: "
+ + "inferring checksum by reading first byte");
ct = DataChecksum.Type.DEFAULT;
}
- if (blockIdx == 0) { // first block
- setCrcType(ct);
- } else if (getCrcType() != DataChecksum.Type.MIXED &&
- getCrcType() != ct) {
- // if crc types are mixed in a file
- setCrcType(DataChecksum.Type.MIXED);
- }
-
+ setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(),
+ checksumData.getCrcPerBlock(), ct);
+ //read md5
+ final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
+ md5.write(md5writer);
if (LOG.isDebugEnabled()) {
- if (blockIdx == 0) {
- LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
- + ", crcPerBlock=" + getCrcPerBlock());
- }
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
}
}
}
+
+ /**
+ * Reconstruct this data block and recalculate checksum.
+ *
+ * @param errBlkIndex
+ * error index to be reconstrcuted and recalculate checksum.
+ * @throws IOException
+ */
+ private void recalculateChecksum(int errBlkIndex) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recalculate checksum for the missing/failed block index "
+ + errBlkIndex);
+ }
+ byte[] errIndices = new byte[1];
+ errIndices[0] = (byte) errBlkIndex;
+ StripedReconstructionInfo stripedReconInfo =
+ new StripedReconstructionInfo(
+ blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
+ final StripedBlockChecksumReconstructor checksumRecon =
+ new StripedBlockChecksumReconstructor(
+ getDatanode().getErasureCodingWorker(), stripedReconInfo,
+ md5writer);
+ checksumRecon.reconstruct();
+
+ DataChecksum checksum = checksumRecon.getChecksum();
+ long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
+ : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
+ setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
+ crcPerBlock, checksum.getChecksumType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recalculated checksum for the block index " + errBlkIndex
+ + ": md5=" + checksumRecon.getMD5());
+ }
+ }
+
+ private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
+ final long cpb, DataChecksum.Type ct) throws IOException {
+ //read byte-per-checksum
+ if (blockIdx == 0) { //first block
+ setBytesPerCRC(bpc);
+ } else if (bpc != getBytesPerCRC()) {
+ throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ + " but bytesPerCRC=" + getBytesPerCRC());
+ }
+
+ //read crc-per-block
+ if (blockIdx == 0) {
+ setCrcPerBlock(cpb);
+ }
+
+ if (blockIdx == 0) { // first block
+ setCrcType(ct);
+ } else if (getCrcType() != DataChecksum.Type.MIXED &&
+ getCrcType() != ct) {
+ // if crc types are mixed in a file
+ setCrcType(DataChecksum.Type.MIXED);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ if (blockIdx == 0) {
+ LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ + ", crcPerBlock=" + getCrcPerBlock());
+ }
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index e7c5abc..aacbb2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -116,19 +116,24 @@ public final class ErasureCodingWorker {
*/
public void processErasureCodingTasks(
Collection<BlockECReconstructionInfo> ecTasks) {
- for (BlockECReconstructionInfo reconstructionInfo : ecTasks) {
+ for (BlockECReconstructionInfo reconInfo : ecTasks) {
try {
- final StripedReconstructor task =
- new StripedReconstructor(this, reconstructionInfo);
+ StripedReconstructionInfo stripedReconInfo =
+ new StripedReconstructionInfo(
+ reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
+ reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
+ reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
+ final StripedBlockReconstructor task =
+ new StripedBlockReconstructor(this, stripedReconInfo);
if (task.hasValidTargets()) {
stripedReconstructionPool.submit(task);
} else {
LOG.warn("No missing internal block. Skip reconstruction for task:{}",
- reconstructionInfo);
+ reconInfo);
}
} catch (Throwable e) {
LOG.warn("Failed to reconstruct striped block {}",
- reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
+ reconInfo.getExtendedBlock().getLocalBlock(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
new file mode 100644
index 0000000..1b6758b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MD5Hash;
+
+/**
+ * StripedBlockChecksumReconstructor reconstruct one or more missed striped
+ * block in the striped block group, the minimum number of live striped blocks
+ * should be no less than data block number. Then checksum will be recalculated
+ * using the newly reconstructed block.
+ */
+@InterfaceAudience.Private
+public class StripedBlockChecksumReconstructor extends StripedReconstructor {
+
+ private ByteBuffer targetBuffer;
+ private final byte[] targetIndices;
+
+ private byte[] checksumBuf;
+ private DataOutputBuffer checksumWriter;
+ private MD5Hash md5;
+ private long checksumDataLen;
+
+ public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
+ StripedReconstructionInfo stripedReconInfo,
+ DataOutputBuffer checksumWriter) throws IOException {
+ super(worker, stripedReconInfo);
+ this.targetIndices = stripedReconInfo.getTargetIndices();
+ assert targetIndices != null;
+ this.checksumWriter = checksumWriter;
+ init();
+ }
+
+ private void init() throws IOException {
+ getStripedReader().init();
+ // allocate buffer to keep the reconstructed block data
+ targetBuffer = allocateBuffer(getBufferSize());
+ long maxTargetLen = 0L;
+ for (int targetIndex : targetIndices) {
+ maxTargetLen = Math.max(maxTargetLen, getBlockLen(targetIndex));
+ }
+ setMaxTargetLength(maxTargetLen);
+ int checksumSize = getChecksum().getChecksumSize();
+ int bytesPerChecksum = getChecksum().getBytesPerChecksum();
+ int tmpLen = checksumSize * (getBufferSize() / bytesPerChecksum);
+ checksumBuf = new byte[tmpLen];
+ }
+
+ public void reconstruct() throws IOException {
+ MessageDigest digester = MD5Hash.getDigester();
+ while (getPositionInBlock() < getMaxTargetLength()) {
+ long remaining = getMaxTargetLength() - getPositionInBlock();
+ final int toReconstructLen = (int) Math
+ .min(getStripedReader().getBufferSize(), remaining);
+ // step1: read from minimum source DNs required for reconstruction.
+ // The returned success list is the source DNs we do real read from
+ getStripedReader().readMinimumSources(toReconstructLen);
+
+ // step2: decode to reconstruct targets
+ reconstructTargets(toReconstructLen);
+
+ // step3: calculate checksum
+ getChecksum().calculateChunkedSums(targetBuffer.array(), 0,
+ targetBuffer.remaining(), checksumBuf, 0);
+
+ // step4: updates the digest using the checksum array of bytes
+ digester.update(checksumBuf, 0, checksumBuf.length);
+ checksumDataLen += checksumBuf.length;
+ updatePositionInBlock(toReconstructLen);
+ clearBuffers();
+ }
+
+ byte[] digest = digester.digest();
+ md5 = new MD5Hash(digest);
+ md5.write(checksumWriter);
+ }
+
+ private void reconstructTargets(int toReconstructLen) {
+ initDecoderIfNecessary();
+
+ ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
+
+ ByteBuffer[] outputs = new ByteBuffer[1];
+ targetBuffer.limit(toReconstructLen);
+ outputs[0] = targetBuffer;
+ int[] tarIndices = new int[targetIndices.length];
+ for (int i = 0; i < targetIndices.length; i++) {
+ tarIndices[i] = targetIndices[i];
+ }
+ getDecoder().decode(inputs, tarIndices, outputs);
+ }
+
+ /**
+ * Clear all associated buffers.
+ */
+ private void clearBuffers() {
+ getStripedReader().clearBuffers();
+ targetBuffer.clear();
+ }
+
+ public MD5Hash getMD5() {
+ return md5;
+ }
+
+ public long getChecksumDataLen() {
+ return checksumDataLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
new file mode 100644
index 0000000..b800bef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * StripedBlockReconstructor reconstruct one or more missed striped block in
+ * the striped block group, the minimum number of live striped blocks should
+ * be no less than data block number.
+ */
+@InterfaceAudience.Private
+class StripedBlockReconstructor extends StripedReconstructor
+ implements Runnable {
+
+ private StripedWriter stripedWriter;
+
+ StripedBlockReconstructor(ErasureCodingWorker worker,
+ StripedReconstructionInfo stripedReconInfo) {
+ super(worker, stripedReconInfo);
+
+ stripedWriter = new StripedWriter(this, getDatanode(),
+ getConf(), stripedReconInfo);
+ }
+
+ boolean hasValidTargets() {
+ return stripedWriter.hasValidTargets();
+ }
+
+ @Override
+ public void run() {
+ getDatanode().incrementXmitsInProgress();
+ try {
+ getStripedReader().init();
+
+ stripedWriter.init();
+
+ reconstruct();
+
+ stripedWriter.endTargetBlocks();
+
+ // Currently we don't check the acks for packets, this is similar as
+ // block replication.
+ } catch (Throwable e) {
+ LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
+ getDatanode().getMetrics().incrECFailedReconstructionTasks();
+ } finally {
+ getDatanode().decrementXmitsInProgress();
+ getDatanode().getMetrics().incrECReconstructionTasks();
+ getStripedReader().close();
+ stripedWriter.close();
+ }
+ }
+
+ void reconstruct() throws IOException {
+ while (getPositionInBlock() < getMaxTargetLength()) {
+ long remaining = getMaxTargetLength() - getPositionInBlock();
+ final int toReconstructLen =
+ (int) Math.min(getStripedReader().getBufferSize(), remaining);
+
+ // step1: read from minimum source DNs required for reconstruction.
+ // The returned success list is the source DNs we do real read from
+ getStripedReader().readMinimumSources(toReconstructLen);
+
+ // step2: decode to reconstruct targets
+ reconstructTargets(toReconstructLen);
+
+ // step3: transfer data
+ if (stripedWriter.transferData2Targets() == 0) {
+ String error = "Transfer failed for all targets.";
+ throw new IOException(error);
+ }
+
+ updatePositionInBlock(toReconstructLen);
+
+ clearBuffers();
+ }
+ }
+
+ private void reconstructTargets(int toReconstructLen) {
+ initDecoderIfNecessary();
+
+ ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
+
+ int[] erasedIndices = stripedWriter.getRealTargetIndices();
+ ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
+
+ getDecoder().decode(inputs, erasedIndices, outputs);
+
+ stripedWriter.updateRealTargetBuffers(toReconstructLen);
+ }
+
+ /**
+ * Clear all associated buffers.
+ */
+ private void clearBuffers() {
+ getStripedReader().clearBuffers();
+
+ stripedWriter.clearBuffers();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index fb7699a..e6d4ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -23,11 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.util.DataChecksum;
@@ -85,8 +82,7 @@ class StripedReader {
private final CompletionService<Void> readService;
StripedReader(StripedReconstructor reconstructor, DataNode datanode,
- Configuration conf,
- BlockECReconstructionInfo reconstructionInfo) {
+ Configuration conf, StripedReconstructionInfo stripedReconInfo) {
stripedReadTimeoutInMills = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
@@ -98,13 +94,11 @@ class StripedReader {
this.datanode = datanode;
this.conf = conf;
- ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy();
- dataBlkNum = ecPolicy.getNumDataUnits();
- parityBlkNum = ecPolicy.getNumParityUnits();
+ dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
+ parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();
- ExtendedBlock blockGroup = reconstructionInfo.getExtendedBlock();
- int cellsNum = (int)((blockGroup.getNumBytes() - 1) / ecPolicy.getCellSize()
- + 1);
+ int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1)
+ / stripedReconInfo.getEcPolicy().getCellSize() + 1);
minRequiredSources = Math.min(cellsNum, dataBlkNum);
if (minRequiredSources < dataBlkNum) {
@@ -113,8 +107,10 @@ class StripedReader {
zeroStripeIndices = new short[zeroStripNum];
}
- liveIndices = reconstructionInfo.getLiveBlockIndices();
- sources = reconstructionInfo.getSourceDnInfos();
+ this.liveIndices = stripedReconInfo.getLiveIndices();
+ assert liveIndices != null;
+ this.sources = stripedReconInfo.getSources();
+ assert sources != null;
readers = new ArrayList<>(sources.length);
readService = reconstructor.createReadService();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
new file mode 100644
index 0000000..a5c328b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * Stores striped block info that can be used for block reconstruction.
+ */
+@InterfaceAudience.Private
+public class StripedReconstructionInfo {
+
+ private final ExtendedBlock blockGroup;
+ private final ErasureCodingPolicy ecPolicy;
+
+ // source info
+ private final byte[] liveIndices;
+ private final DatanodeInfo[] sources;
+
+ // target info
+ private final byte[] targetIndices;
+ private final DatanodeInfo[] targets;
+ private final StorageType[] targetStorageTypes;
+
+ public StripedReconstructionInfo(ExtendedBlock blockGroup,
+ ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
+ byte[] targetIndices) {
+ this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null);
+ }
+
+ StripedReconstructionInfo(ExtendedBlock blockGroup,
+ ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
+ DatanodeInfo[] targets, StorageType[] targetStorageTypes) {
+ this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
+ targetStorageTypes);
+ }
+
+ private StripedReconstructionInfo(ExtendedBlock blockGroup,
+ ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
+ byte[] targetIndices, DatanodeInfo[] targets,
+ StorageType[] targetStorageTypes) {
+
+ this.blockGroup = blockGroup;
+ this.ecPolicy = ecPolicy;
+ this.liveIndices = liveIndices;
+ this.sources = sources;
+ this.targetIndices = targetIndices;
+ this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
+ }
+
+ ExtendedBlock getBlockGroup() {
+ return blockGroup;
+ }
+
+ ErasureCodingPolicy getEcPolicy() {
+ return ecPolicy;
+ }
+
+ byte[] getLiveIndices() {
+ return liveIndices;
+ }
+
+ DatanodeInfo[] getSources() {
+ return sources;
+ }
+
+ byte[] getTargetIndices() {
+ return targetIndices;
+ }
+
+ DatanodeInfo[] getTargets() {
+ return targets;
+ }
+
+ StorageType[] getTargetStorageTypes() {
+ return targetStorageTypes;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 47a6979..782d091 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@@ -39,6 +38,7 @@ import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
/**
* StripedReconstructor reconstruct one or more missed striped block in the
@@ -94,58 +94,50 @@ import java.util.concurrent.ExecutorCompletionService;
* reconstructed result received by targets?
*/
@InterfaceAudience.Private
-class StripedReconstructor implements Runnable {
- private static final Logger LOG = DataNode.LOG;
+abstract class StripedReconstructor {
+ protected static final Logger LOG = DataNode.LOG;
- private final ErasureCodingWorker worker;
- private final DataNode datanode;
private final Configuration conf;
-
+ private final DataNode datanode;
private final ErasureCodingPolicy ecPolicy;
-
private RawErasureDecoder decoder;
-
private final ExtendedBlock blockGroup;
- private final BitSet liveBitSet;
// position in striped internal block
private long positionInBlock;
-
private StripedReader stripedReader;
-
- private StripedWriter stripedWriter;
-
+ private ThreadPoolExecutor stripedReadPool;
private final CachingStrategy cachingStrategy;
+ private long maxTargetLength = 0L;
+ private final BitSet liveBitSet;
StripedReconstructor(ErasureCodingWorker worker,
- BlockECReconstructionInfo reconstructionInfo) {
- this.worker = worker;
+ StripedReconstructionInfo stripedReconInfo) {
+ this.stripedReadPool = worker.getStripedReadPool();
this.datanode = worker.getDatanode();
this.conf = worker.getConf();
-
- ecPolicy = reconstructionInfo.getErasureCodingPolicy();
-
- blockGroup = reconstructionInfo.getExtendedBlock();
- byte[] liveIndices = reconstructionInfo.getLiveBlockIndices();
- liveBitSet = new BitSet(ecPolicy.getNumDataUnits() +
- ecPolicy.getNumParityUnits());
- for (int i = 0; i < liveIndices.length; i++) {
- liveBitSet.set(liveIndices[i]);
+ this.ecPolicy = stripedReconInfo.getEcPolicy();
+ liveBitSet = new BitSet(
+ ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+ for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) {
+ liveBitSet.set(stripedReconInfo.getLiveIndices()[i]);
}
-
- stripedReader = new StripedReader(this, datanode,
- conf, reconstructionInfo);
- stripedWriter = new StripedWriter(this, datanode,
- conf, reconstructionInfo);
+ blockGroup = stripedReconInfo.getBlockGroup();
+ stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
cachingStrategy = CachingStrategy.newDefaultStrategy();
positionInBlock = 0L;
}
- BitSet getLiveBitSet() {
- return liveBitSet;
- }
+ /**
+ * Reconstruct one or more missed striped block in the striped block group,
+ * the minimum number of live striped blocks should be no less than data
+ * block number.
+ *
+ * @throws IOException
+ */
+ abstract void reconstruct() throws IOException;
ByteBuffer allocateBuffer(int length) {
return ByteBuffer.allocate(length);
@@ -160,61 +152,8 @@ class StripedReconstructor implements Runnable {
ecPolicy, i);
}
- boolean hasValidTargets() {
- return stripedWriter.hasValidTargets();
- }
-
- @Override
- public void run() {
- datanode.incrementXmitsInProgress();
- try {
- stripedReader.init();
-
- stripedWriter.init();
-
- reconstructAndTransfer();
-
- stripedWriter.endTargetBlocks();
-
- // Currently we don't check the acks for packets, this is similar as
- // block replication.
- } catch (Throwable e) {
- LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e);
- datanode.getMetrics().incrECFailedReconstructionTasks();
- } finally {
- datanode.decrementXmitsInProgress();
- datanode.getMetrics().incrECReconstructionTasks();
- stripedReader.close();
- stripedWriter.close();
- }
- }
-
- void reconstructAndTransfer() throws IOException {
- while (positionInBlock < stripedWriter.getMaxTargetLength()) {
- long remaining = stripedWriter.getMaxTargetLength() - positionInBlock;
- final int toReconstructLen =
- (int) Math.min(stripedReader.getBufferSize(), remaining);
- // step1: read from minimum source DNs required for reconstruction.
- // The returned success list is the source DNs we do real read from
- stripedReader.readMinimumSources(toReconstructLen);
-
- // step2: decode to reconstruct targets
- reconstructTargets(toReconstructLen);
-
- // step3: transfer data
- if (stripedWriter.transferData2Targets() == 0) {
- String error = "Transfer failed for all targets.";
- throw new IOException(error);
- }
-
- positionInBlock += toReconstructLen;
-
- clearBuffers();
- }
- }
-
// Initialize decoder
- private void initDecoderIfNecessary() {
+ protected void initDecoderIfNecessary() {
if (decoder == null) {
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
@@ -223,32 +162,10 @@ class StripedReconstructor implements Runnable {
}
}
- private void reconstructTargets(int toReconstructLen) {
- initDecoderIfNecessary();
-
- ByteBuffer[] inputs = stripedReader.getInputBuffers(toReconstructLen);
-
- int[] erasedIndices = stripedWriter.getRealTargetIndices();
- ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
-
- decoder.decode(inputs, erasedIndices, outputs);
-
- stripedWriter.updateRealTargetBuffers(toReconstructLen);
- }
-
long getPositionInBlock() {
return positionInBlock;
}
- /**
- * Clear all associated buffers.
- */
- private void clearBuffers() {
- stripedReader.clearBuffers();
-
- stripedWriter.clearBuffers();
- }
-
InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
return NetUtils.createSocketAddr(dnInfo.getXferAddr(
datanode.getDnConf().getConnectToDnViaHostname()));
@@ -258,7 +175,7 @@ class StripedReconstructor implements Runnable {
return stripedReader.getBufferSize();
}
- DataChecksum getChecksum() {
+ public DataChecksum getChecksum() {
return stripedReader.getChecksum();
}
@@ -267,10 +184,42 @@ class StripedReconstructor implements Runnable {
}
CompletionService<Void> createReadService() {
- return new ExecutorCompletionService<>(worker.getStripedReadPool());
+ return new ExecutorCompletionService<>(stripedReadPool);
}
ExtendedBlock getBlockGroup() {
return blockGroup;
}
+
+ BitSet getLiveBitSet() {
+ return liveBitSet;
+ }
+
+ long getMaxTargetLength() {
+ return maxTargetLength;
+ }
+
+ void setMaxTargetLength(long maxTargetLength) {
+ this.maxTargetLength = maxTargetLength;
+ }
+
+ void updatePositionInBlock(long positionInBlockArg) {
+ this.positionInBlock += positionInBlockArg;
+ }
+
+ RawErasureDecoder getDecoder() {
+ return decoder;
+ }
+
+ StripedReader getStripedReader() {
+ return stripedReader;
+ }
+
+ Configuration getConf() {
+ return conf;
+ }
+
+ DataNode getDatanode() {
+ return datanode;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index e2052a3..ca7a3a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -22,11 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
@@ -57,7 +55,6 @@ class StripedWriter {
private final short[] targetIndices;
private boolean hasValidTargets;
private final StorageType[] targetStorageTypes;
- private long maxTargetLength;
private StripedBlockWriter[] writers;
@@ -67,20 +64,19 @@ class StripedWriter {
private int bytesPerChecksum;
private int checksumSize;
- StripedWriter(StripedReconstructor reconstructor,
- DataNode datanode,
- Configuration conf,
- BlockECReconstructionInfo reconstructionInfo) {
+ StripedWriter(StripedReconstructor reconstructor, DataNode datanode,
+ Configuration conf, StripedReconstructionInfo stripedReconInfo) {
this.reconstructor = reconstructor;
this.datanode = datanode;
this.conf = conf;
- ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy();
- dataBlkNum = ecPolicy.getNumDataUnits();
- parityBlkNum = ecPolicy.getNumParityUnits();
+ dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
+ parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();
- targets = reconstructionInfo.getTargetDnInfos();
- targetStorageTypes = reconstructionInfo.getTargetStorageTypes();
+ this.targets = stripedReconInfo.getTargets();
+ assert targets != null;
+ this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes();
+ assert targetStorageTypes != null;
writers = new StripedBlockWriter[targets.length];
@@ -88,12 +84,12 @@ class StripedWriter {
Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
"Too much missed striped blocks.");
initTargetIndices();
-
- maxTargetLength = 0L;
+ long maxTargetLength = 0L;
for (short targetIndex : targetIndices) {
maxTargetLength = Math.max(maxTargetLength,
reconstructor.getBlockLen(targetIndex));
}
+ reconstructor.setMaxTargetLength(maxTargetLength);
// targetsStatus store whether some target is success, it will record
// any failed target once, if some target failed (invalid DN or transfer
@@ -126,7 +122,6 @@ class StripedWriter {
BitSet bitset = reconstructor.getLiveBitSet();
int m = 0;
- int k = 0;
hasValidTargets = false;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (!bitset.get(i)) {
@@ -257,10 +252,6 @@ class StripedWriter {
}
}
- long getMaxTargetLength() {
- return maxTargetLength;
- }
-
byte[] getChecksumBuf() {
return checksumBuf;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index 598e76f..d223354 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
@@ -276,6 +277,52 @@ public class TestDecommissionWithStriped {
cleanupFile(dfs, ecFile);
}
+ /**
+ * Tests to verify that the file checksum should be able to compute after the
+ * decommission operation.
+ *
+ * Below is the block indices list after the decommission. ' represents
+ * decommissioned node index.
+ *
+ * 0, 2, 3, 4, 5, 6, 7, 8, 1, 1'
+ *
+ * Here, this list contains duplicated blocks and does not maintaining any
+ * order.
+ */
+ @Test(timeout = 120000)
+ public void testFileChecksumAfterDecommission() throws Exception {
+ LOG.info("Starting test testFileChecksumAfterDecommission");
+
+ final Path ecFile = new Path(ecDir, "testFileChecksumAfterDecommission");
+ int writeBytes = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
+ writeStripedFile(dfs, ecFile, writeBytes);
+ Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+ FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);
+
+ final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
+ .get(0);
+ DatanodeInfo[] dnLocs = lb.getLocations();
+ assertEquals(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS, dnLocs.length);
+ int decommNodeIndex = 1;
+
+ // add the node which will be decommissioning
+ decommisionNodes.add(dnLocs[decommNodeIndex]);
+ decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+ assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes());
+ assertNull(checkFile(dfs, ecFile, 9, decommisionNodes, numDNs));
+ StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
+ null);
+
+ // verify checksum
+ FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
+ LOG.info("fileChecksum1:" + fileChecksum1);
+ LOG.info("fileChecksum2:" + fileChecksum2);
+
+ Assert.assertTrue("Checksum mismatches!",
+ fileChecksum1.equals(fileChecksum2));
+ }
+
private void testDecommission(int writeBytes, int storageCount,
int decomNodeCount, String filename) throws IOException, Exception {
Path ecFile = new Path(ecDir, filename);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
index 7cee344..3bee6be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
@@ -163,17 +163,40 @@ public class TestFileChecksum {
Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
}
- /*
- // TODO: allow datanode failure, HDFS-9833
@Test
- public void testStripedAndReplicatedWithFailure() throws Exception {
- FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
- 10, true);
- FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
- 10, true);
+ public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
+ FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize,
+ false);
+ FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1,
+ fileSize, true);
- Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
- }*/
+ LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+ LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
+
+ Assert.assertTrue("Checksum mismatches!",
+ stripedFileChecksum1.equals(stripedFileChecksumRecon));
+ }
+
+ @Test
+ public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
+ FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1,
+ false);
+ FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1,
+ false);
+ FileChecksum stripedFileChecksum2Recon = getFileChecksum(stripedFile2, -1,
+ true);
+
+ LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
+ LOG.info("stripedFileChecksum2:" + stripedFileChecksum1);
+ LOG.info("stripedFileChecksum2Recon:" + stripedFileChecksum2Recon);
+
+ Assert.assertTrue("Checksum mismatches!",
+ stripedFileChecksum1.equals(stripedFileChecksum2));
+ Assert.assertTrue("Checksum mismatches!",
+ stripedFileChecksum1.equals(stripedFileChecksum2Recon));
+ Assert.assertTrue("Checksum mismatches!",
+ stripedFileChecksum2.equals(stripedFileChecksum2Recon));
+ }
private FileChecksum getFileChecksum(String filePath, int range,
boolean killDn) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org