You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:49 UTC
[32/50] [abbrv] hadoop git commit: HDFS-8280. Code Cleanup in
DFSInputStream. Contributed by Jing Zhao.
HDFS-8280. Code Cleanup in DFSInputStream. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/439614b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/439614b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/439614b0
Branch: refs/heads/HDFS-7240
Commit: 439614b0c8a3df3d8b7967451c5331a0e034e13a
Parents: c79e7f7
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Apr 28 18:11:59 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Apr 28 18:11:59 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 141 ++++++++-----------
2 files changed, 61 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/439614b0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1c4cfb4..e7fa8fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -478,6 +478,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8176. Record from/to snapshots in audit log for snapshot diff report.
(J. Andreina via jing9)
+ HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/439614b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 3f90397..3290223 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -601,7 +601,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
targetBlock.getBlockSize() - 1;
this.currentLocatedBlock = targetBlock;
- assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
DNAddrPair retval = chooseDataNode(targetBlock, null);
@@ -610,35 +609,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
StorageType storageType = retval.storageType;
try {
- ExtendedBlock blk = targetBlock.getBlock();
- Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
- CachingStrategy curCachingStrategy;
- boolean shortCircuitForbidden;
- synchronized(infoLock) {
- curCachingStrategy = cachingStrategy;
- shortCircuitForbidden = shortCircuitForbidden();
- }
- blockReader = new BlockReaderFactory(dfsClient.getConf()).
- setInetSocketAddress(targetAddr).
- setRemotePeerFactory(dfsClient).
- setDatanodeInfo(chosenNode).
- setStorageType(storageType).
- setFileName(src).
- setBlock(blk).
- setBlockToken(accessToken).
- setStartOffset(offsetIntoBlock).
- setVerifyChecksum(verifyChecksum).
- setClientName(dfsClient.clientName).
- setLength(blk.getNumBytes() - offsetIntoBlock).
- setCachingStrategy(curCachingStrategy).
- setAllowShortCircuitLocalReads(!shortCircuitForbidden).
- setClientCacheContext(dfsClient.getClientContext()).
- setUserGroupInformation(dfsClient.ugi).
- setConfiguration(dfsClient.getConfiguration()).
- build();
+ blockReader = getBlockReader(targetBlock, offsetIntoBlock,
+ targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
+ storageType, chosenNode);
if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr +
- " for " + blk);
+ " for " + targetBlock.getBlock());
}
return chosenNode;
} catch (IOException ex) {
@@ -663,6 +639,37 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
+ protected BlockReader getBlockReader(LocatedBlock targetBlock,
+ long offsetInBlock, long length, InetSocketAddress targetAddr,
+ StorageType storageType, DatanodeInfo datanode) throws IOException {
+ ExtendedBlock blk = targetBlock.getBlock();
+ Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+ CachingStrategy curCachingStrategy;
+ boolean shortCircuitForbidden;
+ synchronized (infoLock) {
+ curCachingStrategy = cachingStrategy;
+ shortCircuitForbidden = shortCircuitForbidden();
+ }
+ return new BlockReaderFactory(dfsClient.getConf()).
+ setInetSocketAddress(targetAddr).
+ setRemotePeerFactory(dfsClient).
+ setDatanodeInfo(datanode).
+ setStorageType(storageType).
+ setFileName(src).
+ setBlock(blk).
+ setBlockToken(accessToken).
+ setStartOffset(offsetInBlock).
+ setVerifyChecksum(verifyChecksum).
+ setClientName(dfsClient.clientName).
+ setLength(length).
+ setCachingStrategy(curCachingStrategy).
+ setAllowShortCircuitLocalReads(!shortCircuitForbidden).
+ setClientCacheContext(dfsClient.getClientContext()).
+ setUserGroupInformation(dfsClient.ugi).
+ setConfiguration(dfsClient.getConfiguration()).
+ build();
+ }
+
/**
* Close it down!
*/
@@ -935,9 +942,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) {
- try {
- return getBestNodeDNAddrPair(block, ignoredNodes);
- } catch (IOException ie) {
+ DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
+ if (result != null) {
+ return result;
+ } else {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
@@ -954,7 +962,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + ie + errMsg
+ + " from any node: " + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
@@ -977,7 +985,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
openInfo();
block = getBlockAt(block.getStartOffset());
failures++;
- continue;
}
}
}
@@ -986,11 +993,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order.
* @param ignoredNodes Do not choose nodes in this array (may be null)
- * @return The DNAddrPair of the best node.
- * @throws IOException
+ * @return The DNAddrPair of the best node. Null if no node can be chosen.
*/
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
- Collection<DatanodeInfo> ignoredNodes) throws IOException {
+ Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null;
@@ -1010,9 +1016,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
if (chosenNode == null) {
- throw new IOException("No live nodes contain block " + block.getBlock() +
+ DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() +
" after checking nodes = " + Arrays.toString(nodes) +
", ignoredNodes = " + ignoredNodes);
+ return null;
}
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
@@ -1102,40 +1109,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
- CachingStrategy curCachingStrategy;
- boolean allowShortCircuitLocalReads;
LocatedBlock block = getBlockAt(blockStartOffset);
- synchronized(infoLock) {
- curCachingStrategy = cachingStrategy;
- allowShortCircuitLocalReads = !shortCircuitForbidden();
- }
- DatanodeInfo chosenNode = datanode.info;
- InetSocketAddress targetAddr = datanode.addr;
- StorageType storageType = datanode.storageType;
BlockReader reader = null;
-
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
- Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
- reader = new BlockReaderFactory(dfsClient.getConf()).
- setInetSocketAddress(targetAddr).
- setRemotePeerFactory(dfsClient).
- setDatanodeInfo(chosenNode).
- setStorageType(storageType).
- setFileName(src).
- setBlock(block.getBlock()).
- setBlockToken(blockToken).
- setStartOffset(start).
- setVerifyChecksum(verifyChecksum).
- setClientName(dfsClient.clientName).
- setLength(len).
- setCachingStrategy(curCachingStrategy).
- setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
- setClientCacheContext(dfsClient.getClientContext()).
- setUserGroupInformation(dfsClient.ugi).
- setConfiguration(dfsClient.getConfiguration()).
- build();
+ reader = getBlockReader(block, start, len, datanode.addr,
+ datanode.storageType, datanode.info);
int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader);
@@ -1148,34 +1128,33 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (ChecksumException e) {
String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ src + " at " + block.getBlock() + ":" + e.getPos() + " from "
- + chosenNode;
+ + datanode.info;
DFSClient.LOG.warn(msg);
// we want to remember what we have tried
- addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
- addToDeadNodes(chosenNode);
+ addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
+ corruptedBlockMap);
+ addToDeadNodes(datanode.info);
throw new IOException(msg);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
- + "encryption key was invalid when connecting to " + targetAddr
+ + "encryption key was invalid when connecting to " + datanode.addr
+ " : " + e);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
- continue;
- } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
+ } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) {
refetchToken--;
try {
fetchBlockAt(block.getStartOffset());
} catch (IOException fbae) {
// ignore IOE, since we can retry it later in a loop
}
- continue;
} else {
- String msg = "Failed to connect to " + targetAddr + " for file "
+ String msg = "Failed to connect to " + datanode.addr + " for file "
+ src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e);
- addToDeadNodes(chosenNode);
+ addToDeadNodes(datanode.info);
throw new IOException(msg);
}
} finally {
@@ -1187,10 +1166,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
- * int, Map)} except we start up a second, parallel, 'hedged' read
- * if the first read is taking longer than configured amount of
- * time. We then wait on which ever read returns first.
+ * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+ * 'hedged' read if the first read is taking longer than configured amount of
+ * time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
long end, byte[] buf, int offset,
@@ -1248,9 +1226,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
try {
- try {
- chosenNode = getBestNodeDNAddrPair(block, ignored);
- } catch (IOException ioe) {
+ chosenNode = getBestNodeDNAddrPair(block, ignored);
+ if (chosenNode == null) {
chosenNode = chooseDataNode(block, ignored);
}
bb = ByteBuffer.allocate(len);