You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/08/23 14:59:15 UTC
[1/2] hadoop git commit: HDFS-11738. Hedged pread takes more time
when block moved from initial locations. Contributed by Vinayakumar B.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 a45ffdcdd -> c54c3500e
refs/heads/branch-2.8 f6892f582 -> bc1c8f3e5
HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.
(cherry picked from commit b6bfb2fcb2391d51b8de97c01c1290880779132e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c54c3500
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c54c3500
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c54c3500
Branch: refs/heads/branch-2
Commit: c54c3500ed720e39926f4cc7ca3db0cb4854ff55
Parents: a45ffdc
Author: John Zhuge <jz...@apache.org>
Authored: Mon Aug 21 13:44:32 2017 -0700
Committer: Vinayakumar B <vi...@apache.org>
Committed: Wed Aug 23 19:44:52 2017 +0530
----------------------------------------------------------------------
.../hadoop/hdfs/DFSClientFaultInjector.java | 2 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 145 +++++++++++--------
.../java/org/apache/hadoop/hdfs/TestPread.java | 26 +++-
3 files changed, 112 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54c3500/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 748edcd..b58cf16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -61,4 +61,6 @@ public class DFSClientFaultInjector {
public boolean skipRollingRestartWait() {
return false;
}
+
+ public void sleepBeforeHedgedGet() {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54c3500/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 8c8383ed..eb566b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1036,60 +1036,85 @@ public class DFSInputStream extends FSInputStream
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
+ return chooseDataNode(block, ignoredNodes, true);
+ }
+
+ /**
+ * Choose datanode to read from.
+ *
+ * @param block Block to choose datanode addr from
+ * @param ignoredNodes Ignored nodes inside.
+ * @param refetchIfRequired Whether to refetch if no nodes to chose
+ * from.
+ * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
+ * false.
+ */
+ private DNAddrPair chooseDataNode(LocatedBlock block,
+ Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
+ throws IOException {
while (true) {
DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
if (result != null) {
return result;
+ } else if (refetchIfRequired) {
+ block = refetchLocations(block, ignoredNodes);
} else {
- String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
- deadNodes, ignoredNodes);
- String blockInfo = block.getBlock() + " file=" + src;
- if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
- String description = "Could not obtain block: " + blockInfo;
- DFSClient.LOG.warn(description + errMsg
- + ". Throwing a BlockMissingException");
- throw new BlockMissingException(src, description,
- block.getStartOffset());
- }
-
- DatanodeInfo[] nodes = block.getLocations();
- if (nodes == null || nodes.length == 0) {
- DFSClient.LOG.info("No node available for " + blockInfo);
- }
- DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + errMsg
- + ". Will get new block locations from namenode and retry...");
- try {
- // Introducing a random factor to the wait time before another retry.
- // The wait time is dependent on # of failures and a random factor.
- // At the first time of getting a BlockMissingException, the wait time
- // is a random number between 0..3000 ms. If the first retry
- // still fails, we will wait 3000 ms grace period before the 2nd retry.
- // Also at the second retry, the waiting window is expanded to 6000 ms
- // alleviating the request rate from the server. Similarly the 3rd retry
- // will wait 6000ms grace period before retry and the waiting window is
- // expanded to 9000ms.
- final int timeWindow = dfsClient.getConf().getTimeWindow();
- double waitTime = timeWindow * failures + // grace period for the last round of attempt
- // expanding time window for each failure
- timeWindow * (failures + 1) *
- ThreadLocalRandom.current().nextDouble();
- DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
- " IOException, will wait for " + waitTime + " msec.");
- Thread.sleep((long)waitTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(
- "Interrupted while choosing DataNode for read.");
- }
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
- openInfo(true);
- block = refreshLocatedBlock(block);
- failures++;
+ return null;
}
}
}
+ private LocatedBlock refetchLocations(LocatedBlock block,
+ Collection<DatanodeInfo> ignoredNodes) throws IOException {
+ String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+ deadNodes, ignoredNodes);
+ String blockInfo = block.getBlock() + " file=" + src;
+ if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
+ String description = "Could not obtain block: " + blockInfo;
+ DFSClient.LOG.warn(description + errMsg
+ + ". Throwing a BlockMissingException");
+ throw new BlockMissingException(src, description,
+ block.getStartOffset());
+ }
+
+ DatanodeInfo[] nodes = block.getLocations();
+ if (nodes == null || nodes.length == 0) {
+ DFSClient.LOG.info("No node available for " + blockInfo);
+ }
+ DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ + " from any node: " + errMsg
+ + ". Will get new block locations from namenode and retry...");
+ try {
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a BlockMissingException, the wait time
+ // is a random number between 0..3000 ms. If the first retry
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 6000 ms
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 6000ms grace period before retry and the waiting window is
+ // expanded to 9000ms.
+ final int timeWindow = dfsClient.getConf().getTimeWindow();
+ // grace period for the last round of attempt
+ double waitTime = timeWindow * failures +
+ // expanding time window for each failure
+ timeWindow * (failures + 1) *
+ ThreadLocalRandom.current().nextDouble();
+ DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
+ " IOException, will wait for " + waitTime + " msec.");
+ Thread.sleep((long)waitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(
+ "Interrupted while choosing DataNode for read.");
+ }
+ deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+ openInfo(true);
+ block = refreshLocatedBlock(block);
+ failures++;
+ return block;
+ }
+
/**
* Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order.
@@ -1183,6 +1208,7 @@ public class DFSInputStream extends FSInputStream
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
+ DFSClientFaultInjector.get().sleepBeforeHedgedGet();
byte[] buf = bb.array();
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
@@ -1385,20 +1411,22 @@ public class DFSInputStream extends FSInputStream
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
+ boolean refetch = false;
try {
- chosenNode = getBestNodeDNAddrPair(block, ignored);
- if (chosenNode == null) {
- chosenNode = chooseDataNode(block, ignored);
+ chosenNode = chooseDataNode(block, ignored, false);
+ if (chosenNode != null) {
+ // Latest block, if refreshed internally
+ block = chosenNode.block;
+ bb = ByteBuffer.allocate(len);
+ Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+ chosenNode, block, start, end, bb, corruptedBlockMap,
+ hedgedReadId++);
+ Future<ByteBuffer> oneMoreRequest = hedgedService
+ .submit(getFromDataNodeCallable);
+ futures.add(oneMoreRequest);
+ } else {
+ refetch = true;
}
- // Latest block, if refreshed internally
- block = chosenNode.block;
- bb = ByteBuffer.allocate(len);
- Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block, start, end, bb,
- corruptedBlockMap, hedgedReadId++);
- Future<ByteBuffer> oneMoreRequest = hedgedService
- .submit(getFromDataNodeCallable);
- futures.add(oneMoreRequest);
} catch (IOException ioe) {
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
ioe.getMessage());
@@ -1416,6 +1444,9 @@ public class DFSInputStream extends FSInputStream
} catch (InterruptedException ie) {
// Ignore and retry
}
+ if (refetch) {
+ refetchLocations(block, ignored);
+ }
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54c3500/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index 903ee6c..b33de91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -629,7 +629,7 @@ public class TestPread {
*/
@Test
public void testPreadFailureWithChangedBlockLocations() throws Exception {
- doPreadTestWithChangedLocations();
+ doPreadTestWithChangedLocations(1);
}
/**
@@ -642,21 +642,36 @@ public class TestPread {
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br>
*/
- @Test
+ @Test(timeout = 60000)
public void testPreadHedgedFailureWithChangedBlockLocations()
throws Exception {
isHedgedRead = true;
- doPreadTestWithChangedLocations();
+ DFSClientFaultInjector old = DFSClientFaultInjector.get();
+ try {
+ DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+ public void sleepBeforeHedgedGet() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ });
+ doPreadTestWithChangedLocations(2);
+ } finally {
+ DFSClientFaultInjector.set(old);
+ }
}
- private void doPreadTestWithChangedLocations()
+ private void doPreadTestWithChangedLocations(int maxFailures)
throws IOException, TimeoutException, InterruptedException {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (isHedgedRead) {
+ conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100);
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
+ conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000);
}
try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
@@ -750,6 +765,9 @@ public class TestPread {
int n = din.read(0, buf, 0, data.length());
assertEquals(data.length(), n);
assertEquals("Data should be read", data, new String(buf, 0, n));
+ assertTrue("Read should complete with maximum " + maxFailures
+ + " failures, but completed with " + din.failures,
+ din.failures <= maxFailures);
DFSClient.LOG.info("Read completed");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HDFS-11738. Hedged pread takes more time
when block moved from initial locations. Contributed by Vinayakumar B.
Posted by vi...@apache.org.
HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.
(cherry picked from commit b6bfb2fcb2391d51b8de97c01c1290880779132e)
(cherry picked from commit c54c3500ed720e39926f4cc7ca3db0cb4854ff55)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc1c8f3e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc1c8f3e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc1c8f3e
Branch: refs/heads/branch-2.8
Commit: bc1c8f3e5ac12e1ef634958496d90c21b6088a30
Parents: f6892f5
Author: John Zhuge <jz...@apache.org>
Authored: Mon Aug 21 13:44:32 2017 -0700
Committer: Vinayakumar B <vi...@apache.org>
Committed: Wed Aug 23 20:05:54 2017 +0530
----------------------------------------------------------------------
.../hadoop/hdfs/DFSClientFaultInjector.java | 2 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 136 ++++++++++++-------
.../java/org/apache/hadoop/hdfs/TestPread.java | 26 +++-
3 files changed, 108 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1c8f3e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 748edcd..b58cf16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -61,4 +61,6 @@ public class DFSClientFaultInjector {
public boolean skipRollingRestartWait() {
return false;
}
+
+ public void sleepBeforeHedgedGet() {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1c8f3e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index f99a15b..b8705f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1006,54 +1006,80 @@ public class DFSInputStream extends FSInputStream
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
+ return chooseDataNode(block, ignoredNodes, true);
+ }
+
+ /**
+ * Choose datanode to read from.
+ *
+ * @param block Block to choose datanode addr from
+ * @param ignoredNodes Ignored nodes inside.
+ * @param refetchIfRequired Whether to refetch if no nodes to chose
+ * from.
+ * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
+ * false.
+ */
+ private DNAddrPair chooseDataNode(LocatedBlock block,
+ Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
+ throws IOException {
while (true) {
DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
if (result != null) {
return result;
+ } else if (refetchIfRequired) {
+ block = refetchLocations(block, ignoredNodes);
} else {
- String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
- deadNodes, ignoredNodes);
- String blockInfo = block.getBlock() + " file=" + src;
- if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
- String description = "Could not obtain block: " + blockInfo;
- DFSClient.LOG.warn(description + errMsg
- + ". Throwing a BlockMissingException");
- throw new BlockMissingException(src, description,
- block.getStartOffset());
- }
+ return null;
+ }
+ }
+ }
- DatanodeInfo[] nodes = block.getLocations();
- if (nodes == null || nodes.length == 0) {
- DFSClient.LOG.info("No node available for " + blockInfo);
- }
- DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + errMsg
- + ". Will get new block locations from namenode and retry...");
- try {
- // Introducing a random factor to the wait time before another retry.
- // The wait time is dependent on # of failures and a random factor.
- // At the first time of getting a BlockMissingException, the wait time
- // is a random number between 0..3000 ms. If the first retry
- // still fails, we will wait 3000 ms grace period before the 2nd retry.
- // Also at the second retry, the waiting window is expanded to 6000 ms
- // alleviating the request rate from the server. Similarly the 3rd retry
- // will wait 6000ms grace period before retry and the waiting window is
- // expanded to 9000ms.
- final int timeWindow = dfsClient.getConf().getTimeWindow();
- double waitTime = timeWindow * failures + // grace period for the last round of attempt
- // expanding time window for each failure
- timeWindow * (failures + 1) *
+ private LocatedBlock refetchLocations(LocatedBlock block,
+ Collection<DatanodeInfo> ignoredNodes) throws IOException {
+ String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+ deadNodes, ignoredNodes);
+ String blockInfo = block.getBlock() + " file=" + src;
+ if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
+ String description = "Could not obtain block: " + blockInfo;
+ DFSClient.LOG.warn(description + errMsg
+ + ". Throwing a BlockMissingException");
+ throw new BlockMissingException(src, description,
+ block.getStartOffset());
+ }
+
+ DatanodeInfo[] nodes = block.getLocations();
+ if (nodes == null || nodes.length == 0) {
+ DFSClient.LOG.info("No node available for " + blockInfo);
+ }
+ DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ + " from any node: " + errMsg
+ + ". Will get new block locations from namenode and retry...");
+ try {
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a BlockMissingException, the wait time
+ // is a random number between 0..3000 ms. If the first retry
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 6000 ms
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 6000ms grace period before retry and the waiting window is
+ // expanded to 9000ms.
+ final int timeWindow = dfsClient.getConf().getTimeWindow();
+ // grace period for the last round of attempt
+ double waitTime = timeWindow * failures +
+ // expanding time window for each failure
+ timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble();
- DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
- Thread.sleep((long)waitTime);
- } catch (InterruptedException ignored) {
- }
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
- openInfo(true);
- block = refreshLocatedBlock(block);
- failures++;
- }
+ DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
+ " IOException, will wait for " + waitTime + " msec.");
+ Thread.sleep((long)waitTime);
+ } catch (InterruptedException ignored) {
}
+ deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+ openInfo(true);
+ block = refreshLocatedBlock(block);
+ failures++;
+ return block;
}
/**
@@ -1148,6 +1174,7 @@ public class DFSInputStream extends FSInputStream
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
+ DFSClientFaultInjector.get().sleepBeforeHedgedGet();
byte[] buf = bb.array();
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
@@ -1341,20 +1368,22 @@ public class DFSInputStream extends FSInputStream
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
+ boolean refetch = false;
try {
- chosenNode = getBestNodeDNAddrPair(block, ignored);
- if (chosenNode == null) {
- chosenNode = chooseDataNode(block, ignored);
+ chosenNode = chooseDataNode(block, ignored, false);
+ if (chosenNode != null) {
+ // Latest block, if refreshed internally
+ block = chosenNode.block;
+ bb = ByteBuffer.allocate(len);
+ Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+ chosenNode, block, start, end, bb, corruptedBlockMap,
+ hedgedReadId++);
+ Future<ByteBuffer> oneMoreRequest = hedgedService
+ .submit(getFromDataNodeCallable);
+ futures.add(oneMoreRequest);
+ } else {
+ refetch = true;
}
- // Latest block, if refreshed internally
- block = chosenNode.block;
- bb = ByteBuffer.allocate(len);
- Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block, start, end, bb,
- corruptedBlockMap, hedgedReadId++);
- Future<ByteBuffer> oneMoreRequest = hedgedService
- .submit(getFromDataNodeCallable);
- futures.add(oneMoreRequest);
} catch (IOException ioe) {
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
ioe.getMessage());
@@ -1372,6 +1401,9 @@ public class DFSInputStream extends FSInputStream
} catch (InterruptedException ie) {
// Ignore and retry
}
+ if (refetch) {
+ refetchLocations(block, ignored);
+ }
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1c8f3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index e154490..d042ba2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -566,7 +566,7 @@ public class TestPread {
*/
@Test
public void testPreadFailureWithChangedBlockLocations() throws Exception {
- doPreadTestWithChangedLocations();
+ doPreadTestWithChangedLocations(1);
}
/**
@@ -579,21 +579,36 @@ public class TestPread {
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br>
*/
- @Test
+ @Test(timeout = 60000)
public void testPreadHedgedFailureWithChangedBlockLocations()
throws Exception {
isHedgedRead = true;
- doPreadTestWithChangedLocations();
+ DFSClientFaultInjector old = DFSClientFaultInjector.get();
+ try {
+ DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+ public void sleepBeforeHedgedGet() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ });
+ doPreadTestWithChangedLocations(2);
+ } finally {
+ DFSClientFaultInjector.set(old);
+ }
}
- private void doPreadTestWithChangedLocations()
+ private void doPreadTestWithChangedLocations(int maxFailures)
throws IOException, TimeoutException, InterruptedException {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (isHedgedRead) {
+ conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100);
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
+ conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000);
}
try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
@@ -687,6 +702,9 @@ public class TestPread {
int n = din.read(0, buf, 0, data.length());
assertEquals(data.length(), n);
assertEquals("Data should be read", data, new String(buf, 0, n));
+ assertTrue("Read should complete with maximum " + maxFailures
+ + " failures, but completed with " + din.failures,
+ din.failures <= maxFailures);
DFSClient.LOG.info("Read completed");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org