You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/08/23 14:59:15 UTC

[1/2] hadoop git commit: HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 a45ffdcdd -> c54c3500e
  refs/heads/branch-2.8 f6892f582 -> bc1c8f3e5


HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.

(cherry picked from commit b6bfb2fcb2391d51b8de97c01c1290880779132e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c54c3500
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c54c3500
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c54c3500

Branch: refs/heads/branch-2
Commit: c54c3500ed720e39926f4cc7ca3db0cb4854ff55
Parents: a45ffdc
Author: John Zhuge <jz...@apache.org>
Authored: Mon Aug 21 13:44:32 2017 -0700
Committer: Vinayakumar B <vi...@apache.org>
Committed: Wed Aug 23 19:44:52 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSClientFaultInjector.java     |   2 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 145 +++++++++++--------
 .../java/org/apache/hadoop/hdfs/TestPread.java  |  26 +++-
 3 files changed, 112 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54c3500/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 748edcd..b58cf16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -61,4 +61,6 @@ public class DFSClientFaultInjector {
   public boolean skipRollingRestartWait() {
     return false;
   }
+
+  public void sleepBeforeHedgedGet() {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54c3500/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 8c8383ed..eb566b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1036,60 +1036,85 @@ public class DFSInputStream extends FSInputStream
 
   private DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
+    return chooseDataNode(block, ignoredNodes, true);
+  }
+
+  /**
+   * Choose datanode to read from.
+   *
+   * @param block             Block to choose datanode addr from
+   * @param ignoredNodes      Ignored nodes inside.
+   * @param refetchIfRequired Whether to refetch if no nodes to chose
+   *                          from.
+   * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
+   * false.
+   */
+  private DNAddrPair chooseDataNode(LocatedBlock block,
+      Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
+      throws IOException {
     while (true) {
       DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
       if (result != null) {
         return result;
+      } else if (refetchIfRequired) {
+        block = refetchLocations(block, ignoredNodes);
       } else {
-        String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
-            deadNodes, ignoredNodes);
-        String blockInfo = block.getBlock() + " file=" + src;
-        if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
-          String description = "Could not obtain block: " + blockInfo;
-          DFSClient.LOG.warn(description + errMsg
-              + ". Throwing a BlockMissingException");
-          throw new BlockMissingException(src, description,
-              block.getStartOffset());
-        }
-
-        DatanodeInfo[] nodes = block.getLocations();
-        if (nodes == null || nodes.length == 0) {
-          DFSClient.LOG.info("No node available for " + blockInfo);
-        }
-        DFSClient.LOG.info("Could not obtain " + block.getBlock()
-            + " from any node: " + errMsg
-            + ". Will get new block locations from namenode and retry...");
-        try {
-          // Introducing a random factor to the wait time before another retry.
-          // The wait time is dependent on # of failures and a random factor.
-          // At the first time of getting a BlockMissingException, the wait time
-          // is a random number between 0..3000 ms. If the first retry
-          // still fails, we will wait 3000 ms grace period before the 2nd retry.
-          // Also at the second retry, the waiting window is expanded to 6000 ms
-          // alleviating the request rate from the server. Similarly the 3rd retry
-          // will wait 6000ms grace period before retry and the waiting window is
-          // expanded to 9000ms.
-          final int timeWindow = dfsClient.getConf().getTimeWindow();
-          double waitTime = timeWindow * failures +       // grace period for the last round of attempt
-              // expanding time window for each failure
-              timeWindow * (failures + 1) *
-              ThreadLocalRandom.current().nextDouble();
-          DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
-              " IOException, will wait for " + waitTime + " msec.");
-          Thread.sleep((long)waitTime);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new InterruptedIOException(
-              "Interrupted while choosing DataNode for read.");
-        }
-        deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-        openInfo(true);
-        block = refreshLocatedBlock(block);
-        failures++;
+        return null;
       }
     }
   }
 
+  private LocatedBlock refetchLocations(LocatedBlock block,
+      Collection<DatanodeInfo> ignoredNodes) throws IOException {
+    String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+        deadNodes, ignoredNodes);
+    String blockInfo = block.getBlock() + " file=" + src;
+    if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
+      String description = "Could not obtain block: " + blockInfo;
+      DFSClient.LOG.warn(description + errMsg
+          + ". Throwing a BlockMissingException");
+      throw new BlockMissingException(src, description,
+          block.getStartOffset());
+    }
+
+    DatanodeInfo[] nodes = block.getLocations();
+    if (nodes == null || nodes.length == 0) {
+      DFSClient.LOG.info("No node available for " + blockInfo);
+    }
+    DFSClient.LOG.info("Could not obtain " + block.getBlock()
+        + " from any node: " + errMsg
+        + ". Will get new block locations from namenode and retry...");
+    try {
+      // Introducing a random factor to the wait time before another retry.
+      // The wait time is dependent on # of failures and a random factor.
+      // At the first time of getting a BlockMissingException, the wait time
+      // is a random number between 0..3000 ms. If the first retry
+      // still fails, we will wait 3000 ms grace period before the 2nd retry.
+      // Also at the second retry, the waiting window is expanded to 6000 ms
+      // alleviating the request rate from the server. Similarly the 3rd retry
+      // will wait 6000ms grace period before retry and the waiting window is
+      // expanded to 9000ms.
+      final int timeWindow = dfsClient.getConf().getTimeWindow();
+      // grace period for the last round of attempt
+      double waitTime = timeWindow * failures +
+          // expanding time window for each failure
+          timeWindow * (failures + 1) *
+          ThreadLocalRandom.current().nextDouble();
+      DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
+          " IOException, will wait for " + waitTime + " msec.");
+      Thread.sleep((long)waitTime);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new InterruptedIOException(
+          "Interrupted while choosing DataNode for read.");
+    }
+    deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+    openInfo(true);
+    block = refreshLocatedBlock(block);
+    failures++;
+    return block;
+  }
+
   /**
    * Get the best node from which to stream the data.
    * @param block LocatedBlock, containing nodes in priority order.
@@ -1183,6 +1208,7 @@ public class DFSInputStream extends FSInputStream
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
+        DFSClientFaultInjector.get().sleepBeforeHedgedGet();
         byte[] buf = bb.array();
         int offset = bb.position();
         try (TraceScope ignored = dfsClient.getTracer().
@@ -1385,20 +1411,22 @@ public class DFSInputStream extends FSInputStream
         // We are starting up a 'hedged' read. We have a read already
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
         // If no nodes to do hedged reads against, pass.
+        boolean refetch = false;
         try {
-          chosenNode = getBestNodeDNAddrPair(block, ignored);
-          if (chosenNode == null) {
-            chosenNode = chooseDataNode(block, ignored);
+          chosenNode = chooseDataNode(block, ignored, false);
+          if (chosenNode != null) {
+            // Latest block, if refreshed internally
+            block = chosenNode.block;
+            bb = ByteBuffer.allocate(len);
+            Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+                chosenNode, block, start, end, bb, corruptedBlockMap,
+                hedgedReadId++);
+            Future<ByteBuffer> oneMoreRequest = hedgedService
+                .submit(getFromDataNodeCallable);
+            futures.add(oneMoreRequest);
+          } else {
+            refetch = true;
           }
-          // Latest block, if refreshed internally
-          block = chosenNode.block;
-          bb = ByteBuffer.allocate(len);
-          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block, start, end, bb,
-              corruptedBlockMap, hedgedReadId++);
-          Future<ByteBuffer> oneMoreRequest = hedgedService
-              .submit(getFromDataNodeCallable);
-          futures.add(oneMoreRequest);
         } catch (IOException ioe) {
           DFSClient.LOG.debug("Failed getting node for hedged read: {}",
               ioe.getMessage());
@@ -1416,6 +1444,9 @@ public class DFSInputStream extends FSInputStream
         } catch (InterruptedException ie) {
           // Ignore and retry
         }
+        if (refetch) {
+          refetchLocations(block, ignored);
+        }
         // We got here if exception. Ignore this node on next go around IFF
         // we found a chosenNode to hedge read against.
         if (chosenNode != null && chosenNode.info != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54c3500/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index 903ee6c..b33de91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -629,7 +629,7 @@ public class TestPread {
    */
   @Test
   public void testPreadFailureWithChangedBlockLocations() throws Exception {
-    doPreadTestWithChangedLocations();
+    doPreadTestWithChangedLocations(1);
   }
 
   /**
@@ -642,21 +642,36 @@ public class TestPread {
    * 7. Consider next calls to getBlockLocations() always returns DN3 as last
    * location.<br>
    */
-  @Test
+  @Test(timeout = 60000)
   public void testPreadHedgedFailureWithChangedBlockLocations()
       throws Exception {
     isHedgedRead = true;
-    doPreadTestWithChangedLocations();
+    DFSClientFaultInjector old = DFSClientFaultInjector.get();
+    try {
+      DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+        public void sleepBeforeHedgedGet() {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+          }
+        }
+      });
+      doPreadTestWithChangedLocations(2);
+    } finally {
+      DFSClientFaultInjector.set(old);
+    }
   }
 
-  private void doPreadTestWithChangedLocations()
+  private void doPreadTestWithChangedLocations(int maxFailures)
       throws IOException, TimeoutException, InterruptedException {
     GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
     Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     if (isHedgedRead) {
+      conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100);
       conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
+      conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000);
     }
     try (MiniDFSCluster cluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
@@ -750,6 +765,9 @@ public class TestPread {
       int n = din.read(0, buf, 0, data.length());
       assertEquals(data.length(), n);
       assertEquals("Data should be read", data, new String(buf, 0, n));
+      assertTrue("Read should complete with maximum " + maxFailures
+              + " failures, but completed with " + din.failures,
+          din.failures <= maxFailures);
       DFSClient.LOG.info("Read completed");
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.

Posted by vi...@apache.org.
HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.

(cherry picked from commit b6bfb2fcb2391d51b8de97c01c1290880779132e)
(cherry picked from commit c54c3500ed720e39926f4cc7ca3db0cb4854ff55)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc1c8f3e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc1c8f3e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc1c8f3e

Branch: refs/heads/branch-2.8
Commit: bc1c8f3e5ac12e1ef634958496d90c21b6088a30
Parents: f6892f5
Author: John Zhuge <jz...@apache.org>
Authored: Mon Aug 21 13:44:32 2017 -0700
Committer: Vinayakumar B <vi...@apache.org>
Committed: Wed Aug 23 20:05:54 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSClientFaultInjector.java     |   2 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 136 ++++++++++++-------
 .../java/org/apache/hadoop/hdfs/TestPread.java  |  26 +++-
 3 files changed, 108 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1c8f3e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 748edcd..b58cf16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -61,4 +61,6 @@ public class DFSClientFaultInjector {
   public boolean skipRollingRestartWait() {
     return false;
   }
+
+  public void sleepBeforeHedgedGet() {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1c8f3e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index f99a15b..b8705f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1006,54 +1006,80 @@ public class DFSInputStream extends FSInputStream
 
   private DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
+    return chooseDataNode(block, ignoredNodes, true);
+  }
+
+  /**
+   * Choose datanode to read from.
+   *
+   * @param block             Block to choose datanode addr from
+   * @param ignoredNodes      Ignored nodes inside.
+   * @param refetchIfRequired Whether to refetch if no nodes to chose
+   *                          from.
+   * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
+   * false.
+   */
+  private DNAddrPair chooseDataNode(LocatedBlock block,
+      Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
+      throws IOException {
     while (true) {
       DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
       if (result != null) {
         return result;
+      } else if (refetchIfRequired) {
+        block = refetchLocations(block, ignoredNodes);
       } else {
-        String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
-            deadNodes, ignoredNodes);
-        String blockInfo = block.getBlock() + " file=" + src;
-        if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
-          String description = "Could not obtain block: " + blockInfo;
-          DFSClient.LOG.warn(description + errMsg
-              + ". Throwing a BlockMissingException");
-          throw new BlockMissingException(src, description,
-              block.getStartOffset());
-        }
+        return null;
+      }
+    }
+  }
 
-        DatanodeInfo[] nodes = block.getLocations();
-        if (nodes == null || nodes.length == 0) {
-          DFSClient.LOG.info("No node available for " + blockInfo);
-        }
-        DFSClient.LOG.info("Could not obtain " + block.getBlock()
-            + " from any node: " + errMsg
-            + ". Will get new block locations from namenode and retry...");
-        try {
-          // Introducing a random factor to the wait time before another retry.
-          // The wait time is dependent on # of failures and a random factor.
-          // At the first time of getting a BlockMissingException, the wait time
-          // is a random number between 0..3000 ms. If the first retry
-          // still fails, we will wait 3000 ms grace period before the 2nd retry.
-          // Also at the second retry, the waiting window is expanded to 6000 ms
-          // alleviating the request rate from the server. Similarly the 3rd retry
-          // will wait 6000ms grace period before retry and the waiting window is
-          // expanded to 9000ms.
-          final int timeWindow = dfsClient.getConf().getTimeWindow();
-          double waitTime = timeWindow * failures +       // grace period for the last round of attempt
-              // expanding time window for each failure
-              timeWindow * (failures + 1) *
+  private LocatedBlock refetchLocations(LocatedBlock block,
+      Collection<DatanodeInfo> ignoredNodes) throws IOException {
+    String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+        deadNodes, ignoredNodes);
+    String blockInfo = block.getBlock() + " file=" + src;
+    if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
+      String description = "Could not obtain block: " + blockInfo;
+      DFSClient.LOG.warn(description + errMsg
+          + ". Throwing a BlockMissingException");
+      throw new BlockMissingException(src, description,
+          block.getStartOffset());
+    }
+
+    DatanodeInfo[] nodes = block.getLocations();
+    if (nodes == null || nodes.length == 0) {
+      DFSClient.LOG.info("No node available for " + blockInfo);
+    }
+    DFSClient.LOG.info("Could not obtain " + block.getBlock()
+        + " from any node: " + errMsg
+        + ". Will get new block locations from namenode and retry...");
+    try {
+      // Introducing a random factor to the wait time before another retry.
+      // The wait time is dependent on # of failures and a random factor.
+      // At the first time of getting a BlockMissingException, the wait time
+      // is a random number between 0..3000 ms. If the first retry
+      // still fails, we will wait 3000 ms grace period before the 2nd retry.
+      // Also at the second retry, the waiting window is expanded to 6000 ms
+      // alleviating the request rate from the server. Similarly the 3rd retry
+      // will wait 6000ms grace period before retry and the waiting window is
+      // expanded to 9000ms.
+      final int timeWindow = dfsClient.getConf().getTimeWindow();
+      // grace period for the last round of attempt
+      double waitTime = timeWindow * failures +
+          // expanding time window for each failure
+          timeWindow * (failures + 1) *
               ThreadLocalRandom.current().nextDouble();
-          DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
-          Thread.sleep((long)waitTime);
-        } catch (InterruptedException ignored) {
-        }
-        deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-        openInfo(true);
-        block = refreshLocatedBlock(block);
-        failures++;
-      }
+      DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
+          " IOException, will wait for " + waitTime + " msec.");
+      Thread.sleep((long)waitTime);
+    } catch (InterruptedException ignored) {
     }
+    deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+    openInfo(true);
+    block = refreshLocatedBlock(block);
+    failures++;
+    return block;
   }
 
   /**
@@ -1148,6 +1174,7 @@ public class DFSInputStream extends FSInputStream
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
+        DFSClientFaultInjector.get().sleepBeforeHedgedGet();
         byte[] buf = bb.array();
         int offset = bb.position();
         try (TraceScope ignored = dfsClient.getTracer().
@@ -1341,20 +1368,22 @@ public class DFSInputStream extends FSInputStream
         // We are starting up a 'hedged' read. We have a read already
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
         // If no nodes to do hedged reads against, pass.
+        boolean refetch = false;
         try {
-          chosenNode = getBestNodeDNAddrPair(block, ignored);
-          if (chosenNode == null) {
-            chosenNode = chooseDataNode(block, ignored);
+          chosenNode = chooseDataNode(block, ignored, false);
+          if (chosenNode != null) {
+            // Latest block, if refreshed internally
+            block = chosenNode.block;
+            bb = ByteBuffer.allocate(len);
+            Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+                chosenNode, block, start, end, bb, corruptedBlockMap,
+                hedgedReadId++);
+            Future<ByteBuffer> oneMoreRequest = hedgedService
+                .submit(getFromDataNodeCallable);
+            futures.add(oneMoreRequest);
+          } else {
+            refetch = true;
           }
-          // Latest block, if refreshed internally
-          block = chosenNode.block;
-          bb = ByteBuffer.allocate(len);
-          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block, start, end, bb,
-              corruptedBlockMap, hedgedReadId++);
-          Future<ByteBuffer> oneMoreRequest = hedgedService
-              .submit(getFromDataNodeCallable);
-          futures.add(oneMoreRequest);
         } catch (IOException ioe) {
           DFSClient.LOG.debug("Failed getting node for hedged read: {}",
               ioe.getMessage());
@@ -1372,6 +1401,9 @@ public class DFSInputStream extends FSInputStream
         } catch (InterruptedException ie) {
           // Ignore and retry
         }
+        if (refetch) {
+          refetchLocations(block, ignored);
+        }
         // We got here if exception. Ignore this node on next go around IFF
         // we found a chosenNode to hedge read against.
         if (chosenNode != null && chosenNode.info != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1c8f3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index e154490..d042ba2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -566,7 +566,7 @@ public class TestPread {
    */
   @Test
   public void testPreadFailureWithChangedBlockLocations() throws Exception {
-    doPreadTestWithChangedLocations();
+    doPreadTestWithChangedLocations(1);
   }
 
   /**
@@ -579,21 +579,36 @@ public class TestPread {
    * 7. Consider next calls to getBlockLocations() always returns DN3 as last
    * location.<br>
    */
-  @Test
+  @Test(timeout = 60000)
   public void testPreadHedgedFailureWithChangedBlockLocations()
       throws Exception {
     isHedgedRead = true;
-    doPreadTestWithChangedLocations();
+    DFSClientFaultInjector old = DFSClientFaultInjector.get();
+    try {
+      DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+        public void sleepBeforeHedgedGet() {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+          }
+        }
+      });
+      doPreadTestWithChangedLocations(2);
+    } finally {
+      DFSClientFaultInjector.set(old);
+    }
   }
 
-  private void doPreadTestWithChangedLocations()
+  private void doPreadTestWithChangedLocations(int maxFailures)
       throws IOException, TimeoutException, InterruptedException {
     GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
     Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     if (isHedgedRead) {
+      conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100);
       conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
+      conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000);
     }
     try (MiniDFSCluster cluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
@@ -687,6 +702,9 @@ public class TestPread {
       int n = din.read(0, buf, 0, data.length());
       assertEquals(data.length(), n);
       assertEquals("Data should be read", data, new String(buf, 0, n));
+      assertTrue("Read should complete with maximum " + maxFailures
+              + " failures, but completed with " + din.failures,
+          din.failures <= maxFailures);
       DFSClient.LOG.info("Read completed");
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org