You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/02/10 18:28:04 UTC

hadoop git commit: HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn Sharp.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2b7a7bbe0 -> 07a5184f7


HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07a5184f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07a5184f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07a5184f

Branch: refs/heads/trunk
Commit: 07a5184f74fdeffc42cdaec42ad4378c0e41c541
Parents: 2b7a7bb
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Feb 10 12:27:08 2017 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Feb 10 12:27:08 2017 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 48 ++++++++----------
 .../java/org/apache/hadoop/hdfs/TestPread.java  | 51 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/07a5184f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 5783f90..39d0eed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -421,33 +421,36 @@ public class DFSInputStream extends FSInputStream
       }
       else {
         // search cached blocks first
-        int targetBlockIdx = locatedBlocks.findBlock(offset);
-        if (targetBlockIdx < 0) { // block is not cached
-          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-          // fetch more blocks
-          final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
-          assert (newBlocks != null) : "Could not find target position " + offset;
-          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
-        }
-        blk = locatedBlocks.get(targetBlockIdx);
+        blk = fetchBlockAt(offset, 0, true);
       }
       return blk;
     }
   }
 
   /** Fetch a block from namenode and cache it */
-  protected void fetchBlockAt(long offset) throws IOException {
+  protected LocatedBlock fetchBlockAt(long offset) throws IOException {
+    return fetchBlockAt(offset, 0, false); // don't use cache
+  }
+
+  /** Fetch a block from namenode and cache it */
+  private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
+      throws IOException {
     synchronized(infoLock) {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+        useCache = false;
       }
-      // fetch blocks
-      final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
-      if (newBlocks == null) {
-        throw new IOException("Could not find target position " + offset);
+      if (!useCache) { // fetch blocks
+        final LocatedBlocks newBlocks = (length == 0)
+            ? dfsClient.getLocatedBlocks(src, offset)
+            : dfsClient.getLocatedBlocks(src, offset, length);
+        if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
+          throw new EOFException("Could not find target position " + offset);
+        }
+        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
       }
-      locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+      return locatedBlocks.get(targetBlockIdx);
     }
   }
 
@@ -502,28 +505,15 @@ public class DFSInputStream extends FSInputStream
       assert (locatedBlocks != null) : "locatedBlocks is null";
       List<LocatedBlock> blockRange = new ArrayList<>();
       // search cached blocks first
-      int blockIdx = locatedBlocks.findBlock(offset);
-      if (blockIdx < 0) { // block is not cached
-        blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
-      }
       long remaining = length;
       long curOff = offset;
       while(remaining > 0) {
-        LocatedBlock blk = null;
-        if(blockIdx < locatedBlocks.locatedBlockCount())
-          blk = locatedBlocks.get(blockIdx);
-        if (blk == null || curOff < blk.getStartOffset()) {
-          LocatedBlocks newBlocks;
-          newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
-          locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
-          continue;
-        }
+        LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
         assert curOff >= blk.getStartOffset() : "Block not found";
         blockRange.add(blk);
         long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
         remaining -= bytesRead;
         curOff += bytesRead;
-        blockIdx++;
       }
       return blockRange;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07a5184f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index c761225..637f2df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Random;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
@@ -42,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -491,6 +494,54 @@ public class TestPread {
     }
   }
 
+  @Test
+  public void testTruncateWhileReading() throws Exception {
+    Path path = new Path("/testfile");
+    final int blockSize = 512;
+
+    // prevent initial pre-fetch of multiple block locations
+    Configuration conf = new Configuration();
+    conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, blockSize);
+
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // create multi-block file
+      FSDataOutputStream dos =
+          fs.create(path, true, blockSize, (short)1, blockSize);
+      dos.write(new byte[blockSize*3]);
+      dos.close();
+      // truncate a file while it's open
+      final FSDataInputStream dis = fs.open(path);
+      while (!fs.truncate(path, 10)) {
+        Thread.sleep(10);
+      }
+      // verify that reading bytes outside the initial pre-fetch do
+      // not send the client into an infinite loop querying locations.
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      Future<?> future = executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          // read from 2nd block.
+          dis.readFully(blockSize, new byte[4]);
+          return null;
+        }
+      });
+      try {
+        future.get(4, TimeUnit.SECONDS);
+        Assert.fail();
+      } catch (ExecutionException ee) {
+        assertTrue(ee.toString(), ee.getCause() instanceof EOFException);
+      } finally {
+        future.cancel(true);
+        executor.shutdown();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     new TestPread().testPreadDFS();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org