You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/22 00:47:40 UTC
hadoop git commit: HDFS-8760. Erasure Coding: reuse BlockReader when
reading the same block in pread. Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 29495cb8f -> f8f7a923b
HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f8f7a923
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f8f7a923
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f8f7a923
Branch: refs/heads/HDFS-7285
Commit: f8f7a923b76abcd1d0242c15a536b20af1c1695e
Parents: 29495cb
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jul 21 15:47:26 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jul 21 15:47:26 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 54 +--
.../hadoop/hdfs/DFSStripedInputStream.java | 350 ++++++++-----------
.../apache/hadoop/hdfs/server/mover/Mover.java | 3 -
.../hadoop/hdfs/util/StripedBlockUtil.java | 35 --
.../apache/hadoop/hdfs/StripedFileTestUtil.java | 17 +-
.../hadoop/hdfs/TestWriteReadStripedFile.java | 11 +-
7 files changed, 172 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 4709388..10a8cde 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -364,3 +364,6 @@
to be consistent with trunk. (zhz)
HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549)
+
+ HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread.
+ (jing9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 5b10ffe..6c3f0ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -44,7 +44,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@@ -1140,41 +1139,24 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * Used when reading contiguous blocks
- */
- private void actualGetFromOneDataNode(final DNAddrPair datanode,
- LocatedBlock block, final long start, final long end, byte[] buf,
- int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
- throws IOException {
- final int length = (int) (end - start + 1);
- actualGetFromOneDataNode(datanode, block, start, end, buf,
- new int[]{offset}, new int[]{length}, corruptedBlockMap);
- }
-
- /**
* Read data from one DataNode.
* @param datanode the datanode from which to read data
* @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
- * @param offsets the data may be read into multiple segments of the buf
- * (when reading a striped block). this array indicates the
- * offset of each buf segment.
- * @param lengths the length of each buf segment
+ * @param offset the offset in buf
* @param corruptedBlockMap map recording list of datanodes with corrupted
* block replica
*/
- void actualGetFromOneDataNode(final DNAddrPair datanode,
- LocatedBlock block, final long startInBlk, final long endInBlk,
- byte[] buf, int[] offsets, int[] lengths,
+ void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
+ final long startInBlk, final long endInBlk, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1);
- checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
@@ -1186,13 +1168,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClientFaultInjector.get().fetchFromDatanodeException();
reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
- for (int i = 0; i < offsets.length; i++) {
- int nread = reader.readAll(buf, offsets[i], lengths[i]);
- updateReadStatistics(readStatistics, nread, reader);
- if (nread != lengths[i]) {
- throw new IOException("truncated return from reader.read(): " +
- "excpected " + lengths[i] + ", got " + nread);
- }
+ int nread = reader.readAll(buf, offset, len);
+ updateReadStatistics(readStatistics, nread, reader);
+ if (nread != len) {
+ throw new IOException("truncated return from reader.read(): " +
+ "excpected " + len + ", got " + nread);
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@@ -1248,24 +1228,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * This method verifies that the read portions are valid and do not overlap
- * with each other.
- */
- private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
- Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
- int sum = 0;
- for (int i = 0; i < lengths.length; i++) {
- if (i > 0) {
- int gap = offsets[i] - offsets[i - 1];
- // make sure read portions do not overlap with each other
- Preconditions.checkArgument(gap >= lengths[i - 1]);
- }
- sum += lengths[i];
- }
- Preconditions.checkArgument(sum == totalLen);
- }
-
- /**
* Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7509003..eecdf67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -31,14 +31,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
@@ -48,10 +40,6 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
import java.io.EOFException;
import java.io.IOException;
@@ -166,7 +154,6 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
private StripeRange curStripeRange;
private final CompletionService<Void> readingService;
- private ReaderRetryPolicy retry;
DFSStripedInputStream(DFSClient dfsClient, String src,
boolean verifyChecksum, ECSchema schema, int cellSize,
@@ -198,18 +185,6 @@ public class DFSStripedInputStream extends DFSInputStream {
curStripeRange = new StripeRange(0, 0);
}
- @Override
- public synchronized int read(final ByteBuffer buf) throws IOException {
- ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
- TraceScope scope =
- dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
- try {
- return readWithStrategy(byteBufferReader, 0, buf.remaining());
- } finally {
- scope.close();
- }
- }
-
/**
* When seeking into a new block group, create blockReader for each internal
* block in the group.
@@ -229,33 +204,6 @@ public class DFSStripedInputStream extends DFSInputStream {
this.blockEnd = targetBlockGroup.getStartOffset() +
targetBlockGroup.getBlockSize() - 1;
currentLocatedBlock = targetBlockGroup;
-
- final long offsetIntoBlockGroup = getOffsetInBlockGroup();
- LocatedBlock[] targetBlocks = parseStripedBlockGroup(
- targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
- // The purpose is to get start offset into each block.
- long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
- cellSize, targetBlockGroup, offsetIntoBlockGroup);
- Preconditions.checkState(offsetsForInternalBlocks.length ==
- dataBlkNum + parityBlkNum);
- long minOffset = offsetsForInternalBlocks[dataBlkNum];
-
- retry = new ReaderRetryPolicy();
- for (int i = 0; i < dataBlkNum; i++) {
- LocatedBlock targetBlock = targetBlocks[i];
- if (targetBlock != null) {
- DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null);
- if (dnInfo != null) {
- BlockReader reader = getBlockReaderWithRetry(targetBlock,
- minOffset, targetBlock.getBlockSize() - minOffset,
- dnInfo.addr, dnInfo.storageType, dnInfo.info, target, retry);
- if (reader != null) {
- blockReaders[i] = new BlockReaderInfo(reader, targetBlock,
- dnInfo.info, minOffset);
- }
- }
- }
- }
}
/**
@@ -308,16 +256,16 @@ public class DFSStripedInputStream extends DFSInputStream {
return;
}
for (int i = 0; i < groupSize; i++) {
- closeReader(i);
+ closeReader(blockReaders[i]);
blockReaders[i] = null;
}
blockEnd = -1;
}
- private void closeReader(int index) {
- if (blockReaders[index] != null) {
- IOUtils.cleanup(DFSClient.LOG, blockReaders[index].reader);
- blockReaders[index].skip();
+ private void closeReader(BlockReaderInfo readerInfo) {
+ if (readerInfo != null) {
+ IOUtils.cleanup(DFSClient.LOG, readerInfo.reader);
+ readerInfo.skip();
}
}
@@ -358,17 +306,17 @@ public class DFSStripedInputStream extends DFSInputStream {
for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location
StripeReader sreader = new StatefulStripeReader(readingService, stripe,
- blks, corruptedBlockMap);
+ blks, blockReaders, corruptedBlockMap);
sreader.readStripe();
}
curStripeBuf.position(stripeBufOffset);
curStripeBuf.limit(stripeLimit);
}
- private Callable<Void> readCell(final BlockReader reader,
+ private Callable<Void> readCells(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
- final long targetReaderOffset, final ByteBufferStrategy strategy,
- final int targetLength, final ExtendedBlock currentBlock,
+ final long targetReaderOffset, final ByteBufferStrategy[] strategies,
+ final ExtendedBlock currentBlock,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
return new Callable<Void>() {
@Override
@@ -386,27 +334,31 @@ public class DFSStripedInputStream extends DFSInputStream {
skipped == targetReaderOffset - currentReaderOffset);
}
int result = 0;
- while (result < targetLength) {
- int ret = readToBuffer(reader, datanode, strategy, currentBlock,
+ for (ByteBufferStrategy strategy : strategies) {
+ result += readToBuffer(reader, datanode, strategy, currentBlock,
corruptedBlockMap);
- if (ret < 0) {
- throw new IOException("Unexpected EOS from the reader");
- }
- result += ret;
}
- updateReadStatistics(readStatistics, targetLength, reader);
return null;
}
};
}
private int readToBuffer(BlockReader blockReader,
- DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
+ DatanodeInfo currentNode, ByteBufferStrategy strategy,
ExtendedBlock currentBlock,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
+ final int targetLength = strategy.buf.remaining();
+ int length = 0;
try {
- return readerStrategy.doRead(blockReader, 0, 0);
+ while (length < targetLength) {
+ int ret = strategy.doRead(blockReader, 0, 0);
+ if (ret < 0) {
+ throw new IOException("Unexpected EOS from the reader");
+ }
+ length += ret;
+ }
+ return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
@@ -572,61 +524,49 @@ public class DFSStripedInputStream extends DFSInputStream {
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
- AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize,
- blockGroup, start, end, buf, offset);
+ AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
+ schema, cellSize, blockGroup, start, end, buf, offset);
CompletionService<Void> readService = new ExecutorCompletionService<>(
dfsClient.getStripedReadsThreadPool());
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
- for (AlignedStripe stripe : stripes) {
- // Parse group to get chosen DN location
- StripeReader preader = new PositionStripeReader(readService, stripe,
- blks, corruptedBlockMap);
- preader.readStripe();
- }
- }
-
- private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
- final LocatedBlock block, final long start, final long end,
- final byte[] buf, final int[] offsets, final int[] lengths,
- final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
- final int hedgedReadId) {
- final Span parentSpan = Trace.currentSpan();
- return new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TraceScope scope =
- Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
- try {
- actualGetFromOneDataNode(datanode, block, start,
- end, buf, offsets, lengths, corruptedBlockMap);
- } finally {
- scope.close();
- }
- return null;
+ final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
+ try {
+ for (AlignedStripe stripe : stripes) {
+ // Parse group to get chosen DN location
+ StripeReader preader = new PositionStripeReader(readService, stripe,
+ blks, preaderInfos, corruptedBlockMap);
+ preader.readStripe();
}
- };
+ } finally {
+ for (BlockReaderInfo preaderInfo : preaderInfos) {
+ closeReader(preaderInfo);
+ }
+ }
}
+ /**
+ * The reader for reading a complete {@link AlignedStripe}. Note that an
+ * {@link AlignedStripe} may cross multiple stripes with cellSize width.
+ */
private abstract class StripeReader {
final Map<Future<Void>, Integer> futures = new HashMap<>();
final AlignedStripe alignedStripe;
final CompletionService<Void> service;
final LocatedBlock[] targetBlocks;
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
+ final BlockReaderInfo[] readerInfos;
StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
- LocatedBlock[] targetBlocks,
+ LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
this.service = service;
this.alignedStripe = alignedStripe;
this.targetBlocks = targetBlocks;
+ this.readerInfos = readerInfos;
this.corruptedBlockMap = corruptedBlockMap;
}
- abstract boolean readChunk(final CompletionService<Void> service,
- final LocatedBlock block, int chunkIndex);
-
/** prepare all the data chunks */
abstract void prepareDecodeInputs();
@@ -635,7 +575,12 @@ public class DFSStripedInputStream extends DFSInputStream {
abstract void decode();
- abstract void updateState4SuccessRead(StripingChunkReadResult result);
+ void updateState4SuccessRead(StripingChunkReadResult result) {
+ Preconditions.checkArgument(
+ result.state == StripingChunkReadResult.SUCCESSFUL);
+ readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
+ + alignedStripe.getSpanInBlock());
+ }
private void checkMissingBlocks() throws IOException {
if (alignedStripe.missingChunksNum > parityBlkNum) {
@@ -654,7 +599,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (int i = 0; i < dataBlkNum; i++) {
Preconditions.checkNotNull(alignedStripe.chunks[i]);
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
- if (!readChunk(service, targetBlocks[i], i)) {
+ if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
}
}
@@ -666,7 +611,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
i++) {
if (alignedStripe.chunks[i] == null) {
- if (prepareParityChunk(i) && readChunk(service, targetBlocks[i], i)) {
+ if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
j++;
} else {
alignedStripe.missingChunksNum++;
@@ -676,12 +621,75 @@ public class DFSStripedInputStream extends DFSInputStream {
checkMissingBlocks();
}
+ boolean createBlockReader(LocatedBlock block, int chunkIndex)
+ throws IOException {
+ DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null);
+ if (dnInfo != null) {
+ BlockReader reader = getBlockReaderWithRetry(block,
+ alignedStripe.getOffsetInBlock(),
+ block.getBlockSize() - alignedStripe.getOffsetInBlock(),
+ dnInfo.addr, dnInfo.storageType, dnInfo.info,
+ block.getStartOffset(), new ReaderRetryPolicy());
+ if (reader != null) {
+ readerInfos[chunkIndex] = new BlockReaderInfo(reader, block,
+ dnInfo.info, alignedStripe.getOffsetInBlock());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
+ if (chunk.byteBuffer != null) {
+ ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
+ return new ByteBufferStrategy[]{strategy};
+ } else {
+ ByteBufferStrategy[] strategies =
+ new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
+ for (int i = 0; i < strategies.length; i++) {
+ ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
+ chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
+ strategies[i] = new ByteBufferStrategy(buffer);
+ }
+ return strategies;
+ }
+ }
+
+ boolean readChunk(final LocatedBlock block, int chunkIndex)
+ throws IOException {
+ final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+ if (block == null) {
+ chunk.state = StripingChunk.MISSING;
+ return false;
+ }
+ if (readerInfos[chunkIndex] == null) {
+ if (!createBlockReader(block, chunkIndex)) {
+ chunk.state = StripingChunk.MISSING;
+ return false;
+ }
+ } else if (readerInfos[chunkIndex].shouldSkip) {
+ chunk.state = StripingChunk.MISSING;
+ return false;
+ }
+
+ chunk.state = StripingChunk.PENDING;
+ Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
+ readerInfos[chunkIndex].datanode,
+ readerInfos[chunkIndex].blockReaderOffset,
+ alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
+ block.getBlock(), corruptedBlockMap);
+
+ Future<Void> request = service.submit(readCallable);
+ futures.put(request, chunkIndex);
+ return true;
+ }
+
/** read the whole stripe. do decoding if necessary */
void readStripe() throws IOException {
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
- if (!readChunk(service, targetBlocks[i], i)) {
+ if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
}
}
@@ -700,8 +708,8 @@ public class DFSStripedInputStream extends DFSInputStream {
// first read failure
while (!futures.isEmpty()) {
try {
- StripingChunkReadResult r = getNextCompletedStripedRead(service,
- futures, 0);
+ StripingChunkReadResult r = StripedBlockUtil
+ .getNextCompletedStripedRead(service, futures, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+ alignedStripe);
@@ -721,7 +729,7 @@ public class DFSStripedInputStream extends DFSInputStream {
} else {
returnedChunk.state = StripingChunk.MISSING;
// close the corresponding reader
- closeReader(r.index);
+ closeReader(readerInfos[r.index]);
final int missing = alignedStripe.missingChunksNum;
alignedStripe.missingChunksNum++;
@@ -750,48 +758,17 @@ public class DFSStripedInputStream extends DFSInputStream {
PositionStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+ BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
- super(service, alignedStripe, targetBlocks, corruptedBlockMap);
- }
-
- @Override
- boolean readChunk(final CompletionService<Void> service,
- final LocatedBlock block, int chunkIndex) {
- final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
- if (block == null) {
- chunk.state = StripingChunk.MISSING;
- return false;
- }
- DatanodeInfo loc = block.getLocations()[0];
- StorageType type = block.getStorageTypes()[0];
- DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
- loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
- type);
- chunk.state = StripingChunk.PENDING;
- Callable<Void> readCallable = getFromOneDataNode(dnAddr,
- block, alignedStripe.getOffsetInBlock(),
- alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1,
- chunk.byteArray.buf(), chunk.byteArray.getOffsets(),
- chunk.byteArray.getLengths(), corruptedBlockMap, chunkIndex);
- Future<Void> getFromDNRequest = service.submit(readCallable);
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Submitting striped read request for " + chunkIndex
- + ". Info of the block: " + block + ", offset in block is "
- + alignedStripe.getOffsetInBlock() + ", end is "
- + (alignedStripe.getOffsetInBlock()
- + alignedStripe.getSpanInBlock() - 1));
- }
- futures.put(getFromDNRequest, chunkIndex);
- return true;
+ super(service, alignedStripe, targetBlocks, readerInfos,
+ corruptedBlockMap);
}
@Override
- void updateState4SuccessRead(StripingChunkReadResult r) {}
-
- @Override
void prepareDecodeInputs() {
if (decodeInputs == null) {
- decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
+ decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
+ dataBlkNum, parityBlkNum);
}
}
@@ -799,8 +776,8 @@ public class DFSStripedInputStream extends DFSInputStream {
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum &&
alignedStripe.chunks[index] == null);
- final int decodeIndex = convertIndex4Decode(index, dataBlkNum,
- parityBlkNum);
+ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
+ dataBlkNum, parityBlkNum);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
alignedStripe.chunks[index].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
@@ -809,10 +786,10 @@ public class DFSStripedInputStream extends DFSInputStream {
@Override
void decode() {
- finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum,
- alignedStripe);
- decodeAndFillBuffer(decodeInputs, alignedStripe, dataBlkNum,
- parityBlkNum, decoder);
+ StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum,
+ parityBlkNum, alignedStripe);
+ StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
+ dataBlkNum, parityBlkNum, decoder);
}
}
@@ -821,36 +798,10 @@ public class DFSStripedInputStream extends DFSInputStream {
StatefulStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+ BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
- super(service, alignedStripe, targetBlocks, corruptedBlockMap);
- }
-
- @Override
- boolean readChunk(final CompletionService<Void> service,
- final LocatedBlock block, int chunkIndex) {
- final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
- final BlockReaderInfo readerInfo = blockReaders[chunkIndex];
- if (readerInfo == null || block == null || readerInfo.shouldSkip) {
- chunk.state = StripingChunk.MISSING;
- return false;
- }
- chunk.state = StripingChunk.PENDING;
- ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
- Callable<Void> readCallable = readCell(readerInfo.reader,
- readerInfo.datanode, readerInfo.blockReaderOffset,
- alignedStripe.getOffsetInBlock(), strategy,
- chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap);
- Future<Void> request = readingService.submit(readCallable);
- futures.put(request, chunkIndex);
- return true;
- }
-
- @Override
- void updateState4SuccessRead(StripingChunkReadResult result) {
- Preconditions.checkArgument(
- result.state == StripingChunkReadResult.SUCCESSFUL);
- blockReaders[result.index].setOffset(alignedStripe.getOffsetInBlock()
- + alignedStripe.getSpanInBlock());
+ super(service, alignedStripe, targetBlocks, readerInfos,
+ corruptedBlockMap);
}
@Override
@@ -864,8 +815,8 @@ public class DFSStripedInputStream extends DFSInputStream {
int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
cur.position(pos);
cur.limit((int) (pos + range.spanInBlock));
- final int decodeIndex = convertIndex4Decode(i, dataBlkNum,
- parityBlkNum);
+ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
+ dataBlkNum, parityBlkNum);
decodeInputs[decodeIndex] = cur.slice();
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(
@@ -884,45 +835,20 @@ public class DFSStripedInputStream extends DFSInputStream {
// we have failed the block reader before
return false;
}
- final int decodeIndex = convertIndex4Decode(index, dataBlkNum,
- parityBlkNum);
+ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
+ dataBlkNum, parityBlkNum);
decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
(int) alignedStripe.range.spanInBlock);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
- if (blockReaders[index] == null && !prepareParityBlockReader(index)) {
- alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
- return false;
- }
return true;
}
- private boolean prepareParityBlockReader(int i) throws IOException {
- // prepare the block reader for the parity chunk
- LocatedBlock targetBlock = targetBlocks[i];
- if (targetBlock != null) {
- final long offsetInBlock = alignedStripe.getOffsetInBlock();
- DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null);
- if (dnInfo != null) {
- BlockReader reader = getBlockReaderWithRetry(targetBlock,
- offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
- dnInfo.addr, dnInfo.storageType, dnInfo.info,
- DFSStripedInputStream.this.getPos(), retry);
- if (reader != null) {
- blockReaders[i] = new BlockReaderInfo(reader, targetBlock,
- dnInfo.info, offsetInBlock);
- return true;
- }
- }
- }
- return false;
- }
-
@Override
void decode() {
// TODO no copy for data chunks. this depends on HADOOP-12047
final int span = (int) alignedStripe.getSpanInBlock();
for (int i = 0; i < alignedStripe.chunks.length; i++) {
- final int decodeIndex = convertIndex4Decode(i,
+ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
@@ -941,7 +867,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.MISSING) {
- decodeIndices[pos++] = convertIndex4Decode(i,
+ decodeIndices[pos++] = StripedBlockUtil.convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index ddfd1ea..dcab075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -53,8 +52,6 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
-
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 9b0939c..3e5ef43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -477,41 +477,6 @@ public class StripedBlockUtil {
}
/**
- * Given a logical start offset in a block group, calculate the physical
- * start offset into each stored internal block.
- */
- public static long[] getStartOffsetsForInternalBlocks(ECSchema ecSchema,
- int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup) {
- Preconditions.checkArgument(
- rangeStartInBlockGroup < blockGroup.getBlockSize());
- int dataBlkNum = ecSchema.getNumDataUnits();
- int parityBlkNum = ecSchema.getNumParityUnits();
- long[] startOffsets = new long[dataBlkNum + parityBlkNum];
- Arrays.fill(startOffsets, -1L);
- int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
- StripingCell firstCell = new StripingCell(ecSchema, cellSize,
- firstCellIdxInBG, (int) (rangeStartInBlockGroup % cellSize));
- startOffsets[firstCell.idxInStripe] =
- firstCell.idxInInternalBlk * cellSize + firstCell.offset;
- long earliestStart = startOffsets[firstCell.idxInStripe];
- for (int i = 1; i < dataBlkNum; i++) {
- int idx = firstCellIdxInBG + i;
- if (idx * (long) cellSize >= blockGroup.getBlockSize()) {
- break;
- }
- StripingCell cell = new StripingCell(ecSchema, cellSize, idx, 0);
- startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * (long) cellSize;
- if (startOffsets[cell.idxInStripe] < earliestStart) {
- earliestStart = startOffsets[cell.idxInStripe];
- }
- }
- for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
- startOffsets[i] = earliestStart;
- }
- return startOffsets;
- }
-
- /**
* Given a logical byte range, mapped to each {@link StripingCell}, calculate
* the physical byte range (inclusive) on each stored internal block.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 815a50d..2866a0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -79,10 +79,19 @@ public class StripedFileTestUtil {
for (int startOffset : startOffsets) {
startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
int remaining = fileLength - startOffset;
- in.readFully(startOffset, buf, 0, remaining);
- for (int i = 0; i < remaining; i++) {
- Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
- "same", expected[startOffset + i], buf[i]);
+ int offset = startOffset;
+ final byte[] result = new byte[remaining];
+ while (remaining > 0) {
+ int target = Math.min(remaining, buf.length);
+ in.readFully(offset, buf, 0, target);
+ System.arraycopy(buf, 0, result, offset - startOffset, target);
+ remaining -= target;
+ offset += target;
+ }
+ for (int i = 0; i < fileLength - startOffset; i++) {
+ Assert.assertEquals("Byte at " + (startOffset + i) + " is different, "
+ + "the startOffset is " + startOffset,
+ expected[startOffset + i], result[i]);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8f7a923/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index 2f9322d..089a134 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -19,13 +19,16 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -45,6 +48,11 @@ public class TestWriteReadStripedFile {
private static FileSystem fs;
private static Configuration conf = new HdfsConfiguration();
+ static {
+ ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
+ .getLogger().setLevel(Level.ALL);
+ }
+
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
@@ -232,7 +240,8 @@ public class TestWriteReadStripedFile {
byte[] smallBuf = new byte[1024];
byte[] largeBuf = new byte[fileLength + 100];
- StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+ // TODO: HDFS-8797
+ //StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);