You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2017/08/16 17:07:16 UTC

[24/50] [abbrv] hadoop git commit: HDFS-11303. Hedged read might hang infinitely if read data from all DN failed . Contributed by Chen Zhang, Wei-chiu Chuang, and John Zhuge.

HDFS-11303. Hedged read might hang infinitely if read data from all DN failed . Contributed by Chen Zhang, Wei-chiu Chuang, and John Zhuge.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b242f09
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b242f09
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b242f09

Branch: refs/heads/YARN-1011
Commit: 8b242f09a61a7536d2422546bfa6c2aaf1d57ed6
Parents: 28d97b7
Author: John Zhuge <jz...@cloudera.com>
Authored: Thu Aug 10 14:04:36 2017 -0700
Committer: John Zhuge <jz...@apache.org>
Committed: Fri Aug 11 19:42:07 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 11 ++--
 .../java/org/apache/hadoop/hdfs/TestPread.java  | 63 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b242f09/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index dcc997c..6bff172 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1131,8 +1131,9 @@ public class DFSInputStream extends FSInputStream
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
         futures.add(firstRequest);
+        Future<ByteBuffer> future = null;
         try {
-          Future<ByteBuffer> future = hedgedService.poll(
+          future = hedgedService.poll(
               conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
           if (future != null) {
             ByteBuffer result = future.get();
@@ -1142,16 +1143,18 @@ public class DFSInputStream extends FSInputStream
           }
           DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
               + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
-          // Ignore this node on next go around.
-          ignored.add(chosenNode.info);
           dfsClient.getHedgedReadMetrics().incHedgedReadOps();
           // continue; no need to refresh block locations
         } catch (ExecutionException e) {
-          // Ignore
+          futures.remove(future);
         } catch (InterruptedException e) {
           throw new InterruptedIOException(
               "Interrupted while waiting for reading task");
         }
+        // Ignore this node on next go around.
+        // If poll timeout and the request still ongoing, don't consider it
+        // again. If read data failed, don't consider it either.
+        ignored.add(chosenNode.info);
       } else {
         // We are starting up a 'hedged' read. We have a read already
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b242f09/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index 85fc97b..bcb02b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -59,6 +59,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Supplier;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
 
 /**
  * This class tests the DFS positional read functionality in a single node
@@ -72,6 +74,9 @@ public class TestPread {
   boolean simulatedStorage;
   boolean isHedgedRead;
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestPread.class.getName());
+
   @Before
   public void setup() {
     simulatedStorage = false;
@@ -551,6 +556,64 @@ public class TestPread {
     }
   }
 
+  @Test(timeout=30000)
+  public void testHedgedReadFromAllDNFailed() throws IOException {
+    Configuration conf = new Configuration();
+    int numHedgedReadPoolThreads = 5;
+    final int hedgedReadTimeoutMillis = 50;
+
+    conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
+        numHedgedReadPoolThreads);
+    conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
+        hedgedReadTimeoutMillis);
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
+    // Set up the InjectionHandler
+    DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
+    DFSClientFaultInjector injector = DFSClientFaultInjector.get();
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        if (true) {
+          LOG.info("-------------- throw Checksum Exception");
+          throw new ChecksumException("ChecksumException test", 100);
+        }
+        return null;
+      }
+    }).when(injector).fetchFromDatanodeException();
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .format(true).build();
+    DistributedFileSystem fileSys = cluster.getFileSystem();
+    DFSClient dfsClient = fileSys.getClient();
+    FSDataOutputStream output = null;
+    DFSInputStream input = null;
+    String filename = "/hedgedReadMaxOut.dat";
+    DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
+    // Metrics instance is static, so we need to reset counts from prior tests.
+    metrics.hedgedReadOps.set(0);
+    try {
+      Path file = new Path(filename);
+      output = fileSys.create(file, (short) 2);
+      byte[] data = new byte[64 * 1024];
+      output.write(data);
+      output.flush();
+      output.close();
+      byte[] buffer = new byte[64 * 1024];
+      input = dfsClient.open(filename);
+      input.read(0, buffer, 0, 1024);
+      Assert.fail("Reading the block should have thrown BlockMissingException");
+    } catch (BlockMissingException e) {
+      assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
+      assertTrue(metrics.getHedgedReadOps() == 0);
+    } finally {
+      Mockito.reset(injector);
+      IOUtils.cleanupWithLogger(LOG, input);
+      IOUtils.cleanupWithLogger(LOG, output);
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
    * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org