You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/01/22 18:46:14 UTC
hadoop git commit: HDFS-9646. ErasureCodingWorker may fail when
recovering data blocks with length less than the first internal block.
Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/trunk 34a390077 -> 95363bcc7
HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with length less than the first internal block. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95363bcc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95363bcc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95363bcc
Branch: refs/heads/trunk
Commit: 95363bcc7dae28ba9ae2cd7ee9a258fcb58cd932
Parents: 34a3900
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Jan 22 09:46:02 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Jan 22 09:46:02 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSInputStream.java | 48 ++++--
.../hadoop/hdfs/DFSStripedInputStream.java | 2 +-
.../hadoop/hdfs/util/StripedBlockUtil.java | 13 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../erasurecode/ErasureCodingWorker.java | 146 +++++++++++-----
.../server/protocol/BlockECRecoveryCommand.java | 2 +-
.../hdfs/TestReadStripedFileWithDecoding.java | 17 +-
.../hadoop/hdfs/TestRecoverStripedFile.java | 172 ++++++++++++-------
8 files changed, 270 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 3de60b2..3c91ca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -32,7 +32,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@@ -909,7 +908,8 @@ public class DFSInputStream extends FSInputStream
}
}
- protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off,
+ int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
@@ -959,7 +959,7 @@ public class DFSInputStream extends FSInputStream
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
- currentLocatedBlock.getLocations().length);
+ currentLocatedBlock.getLocations().length, false);
}
}
}
@@ -1492,7 +1492,8 @@ public class DFSInputStream extends FSInputStream
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
// corrupted.
- reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
+ reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length,
+ false);
}
remaining -= bytesToRead;
@@ -1508,6 +1509,7 @@ public class DFSInputStream extends FSInputStream
/**
* DFSInputStream reports checksum failure.
+ * For replicated blocks, we have the following logic:
* Case I : client has tried multiple data nodes and at least one of the
* attempts has succeeded. We report the other failures as corrupted block to
* namenode.
@@ -1515,29 +1517,39 @@ public class DFSInputStream extends FSInputStream
* only report if the total number of replica is 1. We do not
* report otherwise since this maybe due to the client is a handicapped client
* (who can not read).
+ *
+ * For erasure-coded blocks, each block in corruptedBlockMap is an internal
+ * block in a block group, and there is usually only one DataNode
+ * corresponding to each internal block. For this case we simply report the
+ * corrupted blocks to NameNode and ignore the above logic.
+ *
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
- int dataNodeCount) {
+ int dataNodeCount, boolean isStriped) {
if (corruptedBlockMap.isEmpty()) {
return;
}
- Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
- .entrySet().iterator();
- Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
- ExtendedBlock blk = entry.getKey();
- Set<DatanodeInfo> dnSet = entry.getValue();
- if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
- || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
- DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
- int i = 0;
- for (DatanodeInfo dn:dnSet) {
- locs[i++] = dn;
+ List<LocatedBlock> reportList = new ArrayList<>(corruptedBlockMap.size());
+ for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
+ corruptedBlockMap.entrySet()) {
+ ExtendedBlock blk = entry.getKey();
+ Set<DatanodeInfo> dnSet = entry.getValue();
+ if (isStriped || ((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
+ || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
+ DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
+ int i = 0;
+ for (DatanodeInfo dn:dnSet) {
+ locs[i++] = dn;
+ }
+ reportList.add(new LocatedBlock(blk, locs));
}
- LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
- dfsClient.reportChecksumFailure(src, lblocks);
+ }
+ if (reportList.size() > 0) {
+ dfsClient.reportChecksumFailure(src,
+ reportList.toArray(new LocatedBlock[reportList.size()]));
}
corruptedBlockMap.clear();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7433256..d15e536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -451,7 +451,7 @@ public class DFSStripedInputStream extends DFSInputStream {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
- currentLocatedBlock.getLocations().length);
+ currentLocatedBlock.getLocations().length, true);
}
}
return -1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index e8653c8..dbd53a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -35,6 +34,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.*;
@@ -72,6 +73,8 @@ import java.util.concurrent.TimeUnit;
@InterfaceAudience.Private
public class StripedBlockUtil {
+ public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
+
/**
* This method parses a striped block group into individual blocks.
*
@@ -221,15 +224,11 @@ public class StripedBlockUtil {
return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
}
} catch (ExecutionException e) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Exception during striped read task", e);
- }
+ LOG.debug("Exception during striped read task", e);
return new StripingChunkReadResult(futures.remove(future),
StripingChunkReadResult.FAILED);
} catch (CancellationException e) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Exception during striped read task", e);
- }
+ LOG.debug("Exception during striped read task", e);
return new StripingChunkReadResult(futures.remove(future),
StripingChunkReadResult.CANCELLED);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 43adf62..940fa90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -406,6 +406,9 @@ Trunk (Unreleased)
HDFS-9615. Fix variable name typo in DFSConfigKeys. (Ray Chiang via
Arpit Agarwal)
+ HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with
+ length less than the first internal block. (jing9)
+
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 5588eec..6ad7164 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -32,8 +32,10 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@@ -46,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -121,9 +124,8 @@ public final class ErasureCodingWorker {
}
private void initializeStripedReadThreadPool(int num) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using striped reads; pool threads=" + num);
- }
+ LOG.debug("Using striped reads; pool threads=" + num);
+
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
@@ -148,9 +150,7 @@ public final class ErasureCodingWorker {
}
private void initializeStripedBlkRecoveryThreadPool(int num) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using striped block recovery; pool threads=" + num);
- }
+ LOG.debug("Using striped block recovery; pool threads=" + num);
STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new Daemon.DaemonFactory() {
@@ -368,11 +368,11 @@ public final class ErasureCodingWorker {
* @return StripedReader
*/
private StripedReader addStripedReader(int i, long offsetInBlock) {
- StripedReader reader = new StripedReader(liveIndices[i]);
+ final ExtendedBlock block = getBlock(blockGroup, liveIndices[i]);
+ StripedReader reader = new StripedReader(liveIndices[i], block, sources[i]);
stripedReaders.add(reader);
- BlockReader blockReader = newBlockReader(
- getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
+ BlockReader blockReader = newBlockReader(block, offsetInBlock, sources[i]);
if (blockReader != null) {
initChecksumAndBufferSizeIfNeeded(blockReader);
reader.blockReader = blockReader;
@@ -435,19 +435,27 @@ public final class ErasureCodingWorker {
throw new IOException(error);
}
- long firstStripedBlockLength = getBlockLen(blockGroup, 0);
- while (positionInBlock < firstStripedBlockLength) {
- int toRead = Math.min(
- bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+ long maxTargetLength = 0;
+ for (short targetIndex : targetIndices) {
+ maxTargetLength = Math.max(maxTargetLength,
+ getBlockLen(blockGroup, targetIndex));
+ }
+ while (positionInBlock < maxTargetLength) {
+ final int toRecover = (int) Math.min(
+ bufferSize, maxTargetLength - positionInBlock);
// step1: read from minimum source DNs required for reconstruction.
- // The returned success list is the source DNs we do real read from
- success = readMinimumStripedData4Recovery(success);
+ // The returned success list is the source DNs we do real read from
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new HashMap<>();
+ try {
+ success = readMinimumStripedData4Recovery(success, toRecover,
+ corruptionMap);
+ } finally {
+ // report corrupted blocks to NN
+ reportCorruptedBlocks(corruptionMap);
+ }
// step2: decode to reconstruct targets
- long remaining = firstStripedBlockLength - positionInBlock;
- int toRecoverLen = remaining < bufferSize ?
- (int)remaining : bufferSize;
- recoverTargets(success, targetsStatus, toRecoverLen);
+ recoverTargets(success, targetsStatus, toRecover);
// step3: transfer data
if (transferData2Targets(targetsStatus) == 0) {
@@ -456,7 +464,7 @@ public final class ErasureCodingWorker {
}
clearBuffers();
- positionInBlock += toRead;
+ positionInBlock += toRecover;
}
endTargetBlocks(targetsStatus);
@@ -513,10 +521,11 @@ public final class ErasureCodingWorker {
}
}
- private long getReadLength(int index) {
+ /** the reading length should not exceed the length for recovery */
+ private int getReadLength(int index, int recoverLength) {
long blockLen = getBlockLen(blockGroup, index);
long remaining = blockLen - positionInBlock;
- return remaining > bufferSize ? bufferSize : remaining;
+ return (int) Math.min(remaining, recoverLength);
}
/**
@@ -529,11 +538,15 @@ public final class ErasureCodingWorker {
* operations and next iteration read.
*
* @param success the initial success list of source DNs we think best
+ * @param recoverLength the length to recover.
* @return updated success list of source DNs we do real read
* @throws IOException
*/
- private int[] readMinimumStripedData4Recovery(final int[] success)
+ private int[] readMinimumStripedData4Recovery(final int[] success,
+ int recoverLength, Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
throws IOException {
+ Preconditions.checkArgument(recoverLength >= 0 &&
+ recoverLength <= bufferSize);
int nsuccess = 0;
int[] newSuccess = new int[minRequiredSources];
BitSet used = new BitSet(sources.length);
@@ -543,9 +556,11 @@ public final class ErasureCodingWorker {
*/
for (int i = 0; i < minRequiredSources; i++) {
StripedReader reader = stripedReaders.get(success[i]);
- if (getReadLength(liveIndices[success[i]]) > 0) {
- Callable<Void> readCallable = readFromBlock(
- reader.blockReader, reader.buffer);
+ final int toRead = getReadLength(liveIndices[success[i]],
+ recoverLength);
+ if (toRead > 0) {
+ Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
+ toRead, corruptionMap);
Future<Void> f = readService.submit(readCallable);
futures.put(f, success[i]);
} else {
@@ -570,10 +585,10 @@ public final class ErasureCodingWorker {
StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null;
- resultIndex = scheduleNewRead(used);
+ resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
} else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read.
- resultIndex = scheduleNewRead(used);
+ resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
}
if (resultIndex >= 0) {
newSuccess[nsuccess++] = resultIndex;
@@ -601,6 +616,9 @@ public final class ErasureCodingWorker {
}
private void paddingBufferToLen(ByteBuffer buffer, int len) {
+ if (len > buffer.limit()) {
+ buffer.limit(len);
+ }
int toPadding = len - buffer.position();
for (int i = 0; i < toPadding; i++) {
buffer.put((byte) 0);
@@ -648,8 +666,8 @@ public final class ErasureCodingWorker {
int m = 0;
for (int i = 0; i < targetBuffers.length; i++) {
if (targetsStatus[i]) {
+ targetBuffers[i].limit(toRecoverLen);
outputs[m++] = targetBuffers[i];
- outputs[i].limit(toRecoverLen);
}
}
decoder.decode(inputs, erasedIndices, outputs);
@@ -658,7 +676,7 @@ public final class ErasureCodingWorker {
if (targetsStatus[i]) {
long blockLen = getBlockLen(blockGroup, targetIndices[i]);
long remaining = blockLen - positionInBlock;
- if (remaining < 0) {
+ if (remaining <= 0) {
targetBuffers[i].limit(0);
} else if (remaining < toRecoverLen) {
targetBuffers[i].limit((int)remaining);
@@ -678,16 +696,19 @@ public final class ErasureCodingWorker {
* @param used the used source DNs in this iteration.
* @return the array index of source DN if don't need to do real read.
*/
- private int scheduleNewRead(BitSet used) {
+ private int scheduleNewRead(BitSet used, int recoverLength,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
StripedReader reader = null;
// step1: initially we may only have <code>minRequiredSources</code>
// number of StripedReader, and there may be some source DNs we never
// read before, so will try to create StripedReader for one new source DN
// and try to read from it. If found, go to step 3.
int m = stripedReaders.size();
+ int toRead = 0;
while (reader == null && m < sources.length) {
reader = addStripedReader(m, positionInBlock);
- if (getReadLength(liveIndices[m]) > 0) {
+ toRead = getReadLength(liveIndices[m], recoverLength);
+ if (toRead > 0) {
if (reader.blockReader == null) {
reader = null;
m++;
@@ -706,12 +727,14 @@ public final class ErasureCodingWorker {
for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
if (!used.get(i)) {
StripedReader r = stripedReaders.get(i);
- if (getReadLength(liveIndices[i]) > 0) {
+ toRead = getReadLength(liveIndices[i], recoverLength);
+ if (toRead > 0) {
closeBlockReader(r.blockReader);
r.blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), positionInBlock,
sources[i]);
if (r.blockReader != null) {
+ r.buffer.position(0);
m = i;
reader = r;
}
@@ -725,8 +748,8 @@ public final class ErasureCodingWorker {
// step3: schedule if find a correct source DN and need to do real read.
if (reader != null) {
- Callable<Void> readCallable = readFromBlock(
- reader.blockReader, reader.buffer);
+ Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
+ toRead, corruptionMap);
Future<Void> f = readService.submit(readCallable);
futures.put(f, m);
used.set(m);
@@ -742,15 +765,22 @@ public final class ErasureCodingWorker {
}
}
- private Callable<Void> readFromBlock(final BlockReader reader,
- final ByteBuffer buf) {
+ private Callable<Void> readFromBlock(final StripedReader reader,
+ final ByteBuffer buf, final int length,
+ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
- actualReadFromBlock(reader, buf);
+ buf.limit(length);
+ actualReadFromBlock(reader.blockReader, buf);
return null;
+ } catch (ChecksumException e) {
+ LOG.warn("Found Checksum error for " + reader.block + " from "
+ + reader.source + " at " + e.getPos());
+ addCorruptedBlock(reader.block, reader.source, corruptionMap);
+ throw e;
} catch (IOException e) {
LOG.info(e.getMessage());
throw e;
@@ -760,6 +790,30 @@ public final class ErasureCodingWorker {
};
}
+ private void reportCorruptedBlocks(
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) throws IOException {
+ if (!corruptionMap.isEmpty()) {
+ for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
+ corruptionMap.entrySet()) {
+ for (DatanodeInfo dnInfo : entry.getValue()) {
+ datanode.reportRemoteBadBlock(dnInfo, entry.getKey());
+ }
+ }
+ }
+ }
+
+ private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
+ Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
+ if (dnSet == null) {
+ dnSet = new HashSet<>();
+ corruptionMap.put(blk, dnSet);
+ }
+ if (!dnSet.contains(node)) {
+ dnSet.add(node);
+ }
+ }
+
/**
* Read bytes from block
*/
@@ -900,14 +954,14 @@ public final class ErasureCodingWorker {
}
if (zeroStripeBuffers != null) {
- for (int i = 0; i < zeroStripeBuffers.length; i++) {
- zeroStripeBuffers[i].clear();
+ for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
+ zeroStripeBuffer.clear();
}
}
- for (int i = 0; i < targetBuffers.length; i++) {
- if (targetBuffers[i] != null) {
- targetBuffers[i].clear();
+ for (ByteBuffer targetBuffer : targetBuffers) {
+ if (targetBuffer != null) {
+ targetBuffer.clear();
}
}
}
@@ -998,9 +1052,13 @@ public final class ErasureCodingWorker {
private final short index; // internal block index
private BlockReader blockReader;
private ByteBuffer buffer;
+ private final ExtendedBlock block;
+ private final DatanodeInfo source;
- private StripedReader(short index) {
+ StripedReader(short index, ExtendedBlock block, DatanodeInfo source) {
this.index = index;
+ this.block = block;
+ this.source = source;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
index 1cb74b3..d0c1786 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
@@ -136,7 +136,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
.append("Recovering ").append(block).append(" From: ")
.append(Arrays.asList(sources)).append(" To: [")
.append(Arrays.asList(targets)).append(")\n")
- .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices))
+ .append(" Block Indices: ").append(Arrays.toString(liveBlockIndices))
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index 32b0216..b0af50e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
@@ -29,12 +30,16 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -51,6 +56,14 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
public class TestReadStripedFileWithDecoding {
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
+ static {
+ ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
+ .getLogger().setLevel(Level.ALL);
+ GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+ GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
+ }
+
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
@@ -66,9 +79,9 @@ public class TestReadStripedFileWithDecoding {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
- cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
- .numDataNodes(numDNs).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
fs = cluster.getFileSystem();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
index 21352b5..ca9d933 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
@@ -23,11 +23,12 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -64,17 +66,25 @@ public class TestRecoverStripedFile {
static {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+ }
+
+ enum RecoveryType {
+ DataOnly,
+ ParityOnly,
+ Any
}
private MiniDFSCluster cluster;
- private Configuration conf;
private DistributedFileSystem fs;
// Map: DatanodeID -> datanode index in cluster
- private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
+ private Map<DatanodeID, Integer> dnMap = new HashMap<>();
+ private final Random random = new Random();
@Before
public void setup() throws IOException {
- conf = new Configuration();
+ final Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize - 1);
@@ -104,75 +114,140 @@ public class TestRecoverStripedFile {
@Test(timeout = 120000)
public void testRecoverOneParityBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1);
+ assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen,
+ RecoveryType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock1() throws Exception {
int fileLen = cellSize + cellSize/10;
- assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1);
+ assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen,
+ RecoveryType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock2() throws Exception {
int fileLen = 1;
- assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1);
+ assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen,
+ RecoveryType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneParityBlock3() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1);
+ assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen,
+ RecoveryType.ParityOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverThreeParityBlocks() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
+ assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen,
+ RecoveryType.ParityOnly, 3);
}
@Test(timeout = 120000)
public void testRecoverThreeDataBlocks() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
+ assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen,
+ RecoveryType.DataOnly, 3);
}
@Test(timeout = 120000)
public void testRecoverThreeDataBlocks1() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3);
+ assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen,
+ RecoveryType.DataOnly, 3);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+ assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen,
+ RecoveryType.DataOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock1() throws Exception {
int fileLen = cellSize + cellSize/10;
- assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1);
+ assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen,
+ RecoveryType.DataOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverOneDataBlock2() throws Exception {
int fileLen = 1;
- assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1);
+ assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen,
+ RecoveryType.DataOnly, 1);
}
@Test(timeout = 120000)
public void testRecoverAnyBlocks() throws Exception {
int fileLen = 3 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+ assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen,
+ RecoveryType.Any, 2);
}
@Test(timeout = 120000)
public void testRecoverAnyBlocks1() throws Exception {
int fileLen = 10 * blockSize + blockSize/10;
- assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3);
+ assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen,
+ RecoveryType.Any, 3);
}
-
+
+ private int[] generateDeadDnIndices(RecoveryType type, int deadNum,
+ byte[] indices) {
+ List<Integer> deadList = new ArrayList<>(deadNum);
+ while (deadList.size() < deadNum) {
+ int dead = random.nextInt(indices.length);
+ boolean isOfType = true;
+ if (type == RecoveryType.DataOnly) {
+ isOfType = indices[dead] < dataBlkNum;
+ } else if (type == RecoveryType.ParityOnly) {
+ isOfType = indices[dead] >= dataBlkNum;
+ }
+ if (isOfType && !deadList.contains(dead)) {
+ deadList.add(dead);
+ }
+ }
+ int[] d = new int[deadNum];
+ for (int i = 0; i < deadNum; i++) {
+ d[i] = deadList.get(i);
+ }
+ return d;
+ }
+
+ private void shutdownDataNodes(DataNode dn) throws IOException {
+ /*
+ * Kill the datanode which contains one replica
+ * We need to make sure it dead in namenode: clear its update time and
+ * trigger NN to check heartbeat.
+ */
+ dn.shutdown();
+ cluster.setDataNodeDead(dn.getDatanodeId());
+ }
+
+ private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
+ RecoveryType type)
+ throws IOException {
+ int stoppedDN = 0;
+ for (Map.Entry<ExtendedBlock, DataNode> target : corruptTargets.entrySet()) {
+ if (stoppedDN == 0 || type != RecoveryType.DataOnly
+ || random.nextBoolean()) {
+ // stop at least one DN to trigger recovery
+ LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
+ + " with internal block " + target.getKey());
+ shutdownDataNodes(target.getValue());
+ stoppedDN++;
+ } else { // corrupt the data on the DN
+ LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()
+ + " with internal block " + target.getKey());
+ cluster.corruptReplica(target.getValue(), target.getKey());
+ }
+ }
+ return stoppedDN;
+ }
+
/**
* Test the file blocks recovery.
* 1. Check the replica is recovered in the target datanode,
@@ -180,11 +255,7 @@ public class TestRecoverStripedFile {
* 2. Read the file and verify content.
*/
private void assertFileBlocksRecovery(String fileName, int fileLen,
- int recovery, int toRecoverBlockNum) throws Exception {
- if (recovery != 0 && recovery != 1 && recovery != 2) {
- Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
- + "1 is to recovery data blocks, 2 is any.");
- }
+ RecoveryType type, int toRecoverBlockNum) throws Exception {
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
}
@@ -192,7 +263,7 @@ public class TestRecoverStripedFile {
Path file = new Path(fileName);
final byte[] data = new byte[fileLen];
- ThreadLocalRandom.current().nextBytes(data);
+ Arrays.fill(data, (byte) 1);
DFSTestUtil.writeFile(fs, file, data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
@@ -209,26 +280,10 @@ public class TestRecoverStripedFile {
for (DatanodeInfo storageInfo : storageInfos) {
bitset.set(dnMap.get(storageInfo));
}
-
- int[] toDead = new int[toRecoverBlockNum];
- int n = 0;
- for (int i = 0; i < indices.length; i++) {
- if (n < toRecoverBlockNum) {
- if (recovery == 0) {
- if (indices[i] >= dataBlkNum) {
- toDead[n++] = i;
- }
- } else if (recovery == 1) {
- if (indices[i] < dataBlkNum) {
- toDead[n++] = i;
- }
- } else {
- toDead[n++] = i;
- }
- } else {
- break;
- }
- }
+
+ int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices);
+ LOG.info("Note: indices == " + Arrays.toString(indices)
+ + ". Generate errors on datanodes: " + Arrays.toString(dead));
DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
int[] deadDnIndices = new int[toRecoverBlockNum];
@@ -236,46 +291,41 @@ public class TestRecoverStripedFile {
File[] replicas = new File[toRecoverBlockNum];
File[] metadatas = new File[toRecoverBlockNum];
byte[][] replicaContents = new byte[toRecoverBlockNum][];
+ Map<ExtendedBlock, DataNode> errorMap = new HashMap<>(dead.length);
for (int i = 0; i < toRecoverBlockNum; i++) {
- dataDNs[i] = storageInfos[toDead[i]];
+ dataDNs[i] = storageInfos[dead[i]];
deadDnIndices[i] = dnMap.get(dataDNs[i]);
-
+
// Check the block replica file on deadDn before it dead.
blocks[i] = StripedBlockUtil.constructInternalBlock(
- lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
+ lastBlock.getBlock(), cellSize, dataBlkNum, indices[dead[i]]);
+ errorMap.put(blocks[i], cluster.getDataNodes().get(deadDnIndices[i]));
replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
// the block replica on the datanode should be the same as expected
assertEquals(replicas[i].length(),
StripedBlockUtil.getInternalBlockLength(
- lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
+ lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]]));
assertTrue(metadatas[i].getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
+ LOG.info("replica " + i + " locates in file: " + replicas[i]);
replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]);
}
int cellsNum = (fileLen - 1) / cellSize + 1;
int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
- for (int i = 0; i < toRecoverBlockNum; i++) {
- /*
- * Kill the datanode which contains one replica
- * We need to make sure it dead in namenode: clear its update time and
- * trigger NN to check heartbeat.
- */
- DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
- dn.shutdown();
- cluster.setDataNodeDead(dn.getDatanodeId());
- }
+ // shutdown datanodes or generate corruption
+ int stoppedDN = generateErrors(errorMap, type);
// Check the locatedBlocks of the file again
locatedBlocks = getLocatedBlocks(file);
lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
storageInfos = lastBlock.getLocations();
- assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
+ assertEquals(storageInfos.length, groupSize - stoppedDN);
int[] targetDNs = new int[dnNum - groupSize];
- n = 0;
+ int n = 0;
for (int i = 0; i < dnNum; i++) {
if (!bitset.get(i)) { // not contain replica of the block.
targetDNs[n++] = i;
@@ -289,9 +339,11 @@ public class TestRecoverStripedFile {
// Check the replica on the new target node.
for (int i = 0; i < toRecoverBlockNum; i++) {
File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
+ LOG.info("replica after recovery " + replicaAfterRecovery);
File metadataAfterRecovery =
cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
assertEquals(replicaAfterRecovery.length(), replicas[i].length());
+ LOG.info("replica before " + replicas[i]);
assertTrue(metadataAfterRecovery.getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
byte[] replicaContentAfterRecovery =
@@ -366,7 +418,7 @@ public class TestRecoverStripedFile {
BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo(
new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices,
ErasureCodingPolicyManager.getSystemDefaultPolicy());
- List<BlockECRecoveryInfo> ecTasks = new ArrayList<BlockECRecoveryInfo>();
+ List<BlockECRecoveryInfo> ecTasks = new ArrayList<>();
ecTasks.add(invalidECInfo);
dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
}