You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:52 UTC
[24/50] hadoop git commit: HDFS-8272. Erasure Coding: simplify the
retry logic in DFSStripedInputStream (stateful read). Contributed by Jing
Zhao
HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream (stateful read). Contributed by Jing Zhao
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/da2a33f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/da2a33f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/da2a33f0
Branch: refs/heads/HDFS-7285
Commit: da2a33f0969ea673bc16d8515e72c461685379b1
Parents: 27b2346
Author: Zhe Zhang <zh...@apache.org>
Authored: Wed Apr 29 15:53:31 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:16:04 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../hadoop/hdfs/DFSStripedInputStream.java | 336 ++++++++-----------
2 files changed, 150 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da2a33f0/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 9b4bf24..6a9bdee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -143,3 +143,6 @@
HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open.
(Kai Sasaki via jing9)
+
+ HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream
+ (stateful read). (Jing Zhao via Zhe Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da2a33f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f6f7ed2..3da7306 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -126,23 +123,42 @@ public class DFSStripedInputStream extends DFSInputStream {
return results;
}
+ private static class ReaderRetryPolicy {
+ private int fetchEncryptionKeyTimes = 1;
+ private int fetchTokenTimes = 1;
+
+ void refetchEncryptionKey() {
+ fetchEncryptionKeyTimes--;
+ }
+
+ void refetchToken() {
+ fetchTokenTimes--;
+ }
+
+ boolean shouldRefetchEncryptionKey() {
+ return fetchEncryptionKeyTimes > 0;
+ }
+
+ boolean shouldRefetchToken() {
+ return fetchTokenTimes > 0;
+ }
+ }
+
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
- private BlockReader[] blockReaders = null;
- private DatanodeInfo[] currentNodes = null;
+ private final BlockReader[] blockReaders = new BlockReader[groupSize];
+ private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
- private final ECInfo ecInfo;
- DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo info)
- throws IOException {
+ DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+ ECInfo ecInfo) throws IOException {
super(dfsClient, src, verifyChecksum);
// ECInfo is restored from NN just before reading striped file.
- assert info != null;
- ecInfo = info;
+ assert ecInfo != null;
cellSize = ecInfo.getSchema().getChunkSize();
- dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
- parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
+ dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
+ parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
@@ -162,9 +178,7 @@ public class DFSStripedInputStream extends DFSInputStream {
* When seeking into a new block group, create blockReader for each internal
* block in the group.
*/
- @VisibleForTesting
- private synchronized DatanodeInfo[] blockSeekTo(long target)
- throws IOException {
+ private synchronized void blockSeekTo(long target) throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
@@ -172,18 +186,13 @@ public class DFSStripedInputStream extends DFSInputStream {
// Will be getting a new BlockReader.
closeCurrentBlockReaders();
- // Connect to best DataNode for desired Block, with potential offset
- DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize];
- int refetchToken = 1; // only need to get a new access token once
- int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
// Compute desired striped block group
LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
-
// Update current position
this.pos = target;
this.blockEnd = targetBlockGroup.getStartOffset() +
targetBlockGroup.getBlockSize() - 1;
+ currentLocatedBlock = targetBlockGroup;
long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
@@ -191,71 +200,50 @@ public class DFSStripedInputStream extends DFSInputStream {
// The purpose is to get start offset into each block
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
offsetIntoBlockGroup, 0, 0);
+
+ final ReaderRetryPolicy retry = new ReaderRetryPolicy();
+ for (int i = 0; i < groupSize; i++) {
+ LocatedBlock targetBlock = targetBlocks[i];
+ if (targetBlock != null) {
+ DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
+ if (retval != null) {
+ currentNodes[i] = retval.info;
+ blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+ readPortions[i].startOffsetInBlock,
+ targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock,
+ retval.addr, retval.storageType, retval.info, target, retry);
+ }
+ }
+ }
+ }
+
+ private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
+ long offsetInBlock, long length, InetSocketAddress targetAddr,
+ StorageType storageType, DatanodeInfo datanode, long offsetInFile,
+ ReaderRetryPolicy retry) throws IOException {
+ // only need to get a new access token or a new encryption key once
while (true) {
- int i = 0;
- InetSocketAddress targetAddr = null;
try {
- blockReaders = new BlockReader[groupSize];
- for (i = 0; i < groupSize; i++) {
- LocatedBlock targetBlock = targetBlocks[i];
- if (targetBlock == null) {
- continue;
- }
- long offsetIntoBlock = readPortions[i].startOffsetInBlock;
- DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
- chosenNodes[i] = retval.info;
- targetAddr = retval.addr;
- StorageType storageType = retval.storageType;
-
- ExtendedBlock blk = targetBlock.getBlock();
- Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
- CachingStrategy curCachingStrategy;
- boolean shortCircuitForbidden;
- synchronized(infoLock) {
- curCachingStrategy = cachingStrategy;
- shortCircuitForbidden = shortCircuitForbidden();
- }
- blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()).
- setInetSocketAddress(targetAddr).
- setRemotePeerFactory(dfsClient).
- setDatanodeInfo(chosenNodes[i]).
- setStorageType(storageType).
- setFileName(src).
- setBlock(blk).
- setBlockToken(accessToken).
- setStartOffset(offsetIntoBlock).
- setVerifyChecksum(verifyChecksum).
- setClientName(dfsClient.clientName).
- setLength(blk.getNumBytes() - offsetIntoBlock).
- setCachingStrategy(curCachingStrategy).
- setAllowShortCircuitLocalReads(!shortCircuitForbidden).
- setClientCacheContext(dfsClient.getClientContext()).
- setUserGroupInformation(dfsClient.ugi).
- setConfiguration(dfsClient.getConfiguration()).
- build();
- }
- currentLocatedBlock = targetBlockGroup;
- return chosenNodes;
- } catch (IOException ex) {
- // Retry in case of encryption key or token exceptions. Otherwise throw
- // IOException: since each internal block is singly replicated, it's
- // not meaningful trying to locate another replica.
- if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ return getBlockReader(targetBlock, offsetInBlock, length, targetAddr,
+ storageType, datanode);
+ } catch (IOException e) {
+ if (e instanceof InvalidEncryptionKeyException &&
+ retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
- + " : " + ex);
- // The encryption key used is invalid.
- refetchEncryptionKey--;
+ + " : " + e);
dfsClient.clearDataEncryptionKey();
- } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
- refetchToken--;
- fetchBlockAt(target);
+ retry.refetchEncryptionKey();
+ } else if (retry.shouldRefetchToken() &&
+ tokenRefetchNeeded(e, targetAddr)) {
+ fetchBlockAt(offsetInFile);
+ retry.refetchToken();
} else {
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
- + ", add to deadNodes and continue. " + ex, ex);
- // Put chosen node into dead list and throw exception
- addToDeadNodes(chosenNodes[i]);
- throw ex;
+ + ", add to deadNodes and continue.", e);
+ // Put chosen node into dead list, continue
+ addToDeadNodes(datanode);
+ return null;
}
}
}
@@ -272,15 +260,15 @@ public class DFSStripedInputStream extends DFSInputStream {
return;
}
for (int i = 0; i < groupSize; i++) {
- if (blockReaders[i] == null) {
- continue;
- }
- try {
- blockReaders[i].close();
- } catch (IOException e) {
- DFSClient.LOG.error("error closing blockReader", e);
+ if (blockReaders[i] != null) {
+ try {
+ blockReaders[i].close();
+ } catch (IOException e) {
+ DFSClient.LOG.error("error closing blockReader", e);
+ }
+ blockReaders[i] = null;
}
- blockReaders[i] = null;
+ currentNodes[i] = null;
}
blockEnd = -1;
}
@@ -292,123 +280,93 @@ public class DFSStripedInputStream extends DFSInputStream {
if (closed.get()) {
throw new IOException("Stream closed");
}
- Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
- = new HashMap<>();
+ Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
failures = 0;
if (pos < getFileLength()) {
- int retries = 2;
/** Index of the target block in a stripe to read from */
int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
- while (retries > 0) {
- try {
- // currentNode can be left as null if previous read had a checksum
- // error on the same block. See HDFS-3067
- if (pos > blockEnd || currentNodes == null) {
- currentNodes = blockSeekTo(pos);
- }
- int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
- synchronized(infoLock) {
- if (locatedBlocks.isLastBlockComplete()) {
- realLen = (int) Math.min(realLen,
- locatedBlocks.getFileLength() - pos);
- }
+ try {
+ if (pos > blockEnd) {
+ blockSeekTo(pos);
+ }
+ int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+ synchronized (infoLock) {
+ if (locatedBlocks.isLastBlockComplete()) {
+ realLen = (int) Math.min(realLen,
+ locatedBlocks.getFileLength() - pos);
}
+ }
- /** Number of bytes already read into buffer */
- int result = 0;
- while (result < realLen) {
- /**
- * Temporary position into the file; {@link pos} might not proceed
- * to this temporary position in case of exceptions.
- */
- long tmpPos = pos + result;
- /** Start and end offsets of a cell in the file */
- long cellStart = (tmpPos / cellSize) * cellSize;
- long cellEnd = cellStart + cellSize - 1;
-
- /** Number of bytes to read from the current cell */
- int realLenInCell = (int) Math.min(realLen - result,
- cellEnd - tmpPos + 1L);
- assert realLenInCell > 0 : "Temporary position shouldn't be " +
- "after cellEnd";
- // Read from one blockReader up to cell boundary
- int cellRet = readBuffer(blockReaders[idxInGroup],
- currentNodes[idxInGroup], strategy, off + result,
- realLenInCell);
- if (cellRet >= 0) {
- result += cellRet;
- if (cellRet < realLenInCell) {
- // A short read indicates the current blockReader buffer is
- // already drained. Should return the read call. Otherwise
- // should proceed to the next cell.
- break;
- }
- } else {
- // got a EOS from reader though we expect more data on it.
- throw new IOException("Unexpected EOS from the reader");
+ /** Number of bytes already read into buffer */
+ int result = 0;
+ while (result < realLen) {
+ /**
+ * Temporary position into the file; {@link pos} might not proceed
+ * to this temporary position in case of exceptions.
+ */
+ long tmpPos = pos + result;
+ /** Start and end offsets of a cell in the file */
+ long cellStart = (tmpPos / cellSize) * cellSize;
+ long cellEnd = cellStart + cellSize - 1;
+
+ /** Number of bytes to read from the current cell */
+ int realLenInCell = (int) Math.min(realLen - result,
+ cellEnd - tmpPos + 1L);
+ assert realLenInCell > 0 : "Temporary position shouldn't be "
+ + "after cellEnd";
+
+ // Read from one blockReader up to cell boundary
+ int cellRet = readBuffer(blockReaders[idxInGroup],
+ currentNodes[idxInGroup], strategy, off + result, realLenInCell,
+ corruptedBlockMap);
+ if (cellRet >= 0) {
+ result += cellRet;
+ if (cellRet < realLenInCell) {
+ // A short read indicates the current blockReader buffer is
+ // already drained. Should return the read call. Otherwise
+ // should proceed to the next cell.
+ break;
}
- idxInGroup = (idxInGroup + 1) % dataBlkNum;
- }
-
- pos += result;
-
- if (dfsClient.stats != null) {
- dfsClient.stats.incrementBytesRead(result);
- }
- return result;
- } catch (ChecksumException ce) {
- throw ce;
- } catch (IOException e) {
- if (retries == 1) {
- DFSClient.LOG.warn("DFS Read", e);
- }
- blockEnd = -1;
- if (currentNodes[idxInGroup] != null) {
- addToDeadNodes(currentNodes[idxInGroup]);
+ } else {
+ // got a EOS from reader though we expect more data on it.
+ throw new IOException("Unexpected EOS from the reader");
}
- if (--retries == 0) {
- throw e;
- }
- } finally {
- // Check if need to report block replicas corruption either read
- // was successful or ChecksumException occured.
- reportCheckSumFailure(corruptedBlockMap,
- currentLocatedBlock.getLocations().length);
+ idxInGroup = (idxInGroup + 1) % dataBlkNum;
}
+ pos += result;
+ if (dfsClient.stats != null) {
+ dfsClient.stats.incrementBytesRead(result);
+ }
+ return result;
+ } finally {
+ // Check if need to report block replicas corruption either read
+ // was successful or ChecksumException occured.
+ reportCheckSumFailure(corruptedBlockMap,
+ currentLocatedBlock.getLocations().length);
}
}
return -1;
}
private synchronized int readBuffer(BlockReader blockReader,
- DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len)
- throws IOException {
- IOException ioe;
- while (true) {
- try {
- return readerStrategy.doRead(blockReader, off, len);
- } catch ( ChecksumException ce ) {
- DFSClient.LOG.warn("Found Checksum error for "
- + getCurrentBlock() + " from " + currentNode
- + " at " + ce.getPos());
- // If current block group is corrupt, it's meaningless to retry.
- // TODO: this should trigger decoding logic (HDFS-7678)
- throw ce;
- } catch ( IOException e ) {
- ioe = e;
- }
-
- boolean sourceFound = seekToBlockSource(pos);
- if (!sourceFound) {
- throw ioe;
- }
+ DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+ try {
+ return readerStrategy.doRead(blockReader, off, len);
+ } catch ( ChecksumException ce ) {
+ DFSClient.LOG.warn("Found Checksum error for "
+ + getCurrentBlock() + " from " + currentNode
+ + " at " + ce.getPos());
+ // we want to remember which block replicas we have tried
+ addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+ corruptedBlockMap);
+ } catch (IOException e) {
+ DFSClient.LOG.warn("Exception while reading from "
+ + getCurrentBlock() + " of " + src + " from "
+ + currentNode, e);
}
- }
-
- private boolean seekToBlockSource(long targetPos)
- throws IOException {
- currentNodes = blockSeekTo(targetPos);
- return true;
+ // TODO: this should trigger decoding logic (HDFS-7678)
+ return -1;
}
protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
@@ -418,7 +376,7 @@ public class DFSStripedInputStream extends DFSInputStream {
@Override
public int doRead(BlockReader blockReader, int off, int len)
- throws ChecksumException, IOException {
+ throws IOException {
int oldlimit = buf.limit();
if (buf.remaining() > len) {
buf.limit(buf.position() + len);