You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/02/10 18:28:04 UTC
hadoop git commit: HDFS-11379. DFSInputStream may infinite loop
requesting block locations. Contributed by Daryn Sharp.
Repository: hadoop
Updated Branches:
refs/heads/trunk 2b7a7bbe0 -> 07a5184f7
HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07a5184f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07a5184f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07a5184f
Branch: refs/heads/trunk
Commit: 07a5184f74fdeffc42cdaec42ad4378c0e41c541
Parents: 2b7a7bb
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Feb 10 12:27:08 2017 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Feb 10 12:27:08 2017 -0600
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSInputStream.java | 48 ++++++++----------
.../java/org/apache/hadoop/hdfs/TestPread.java | 51 ++++++++++++++++++++
2 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/07a5184f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 5783f90..39d0eed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -421,33 +421,36 @@ public class DFSInputStream extends FSInputStream
}
else {
// search cached blocks first
- int targetBlockIdx = locatedBlocks.findBlock(offset);
- if (targetBlockIdx < 0) { // block is not cached
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
- // fetch more blocks
- final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
- assert (newBlocks != null) : "Could not find target position " + offset;
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
- }
- blk = locatedBlocks.get(targetBlockIdx);
+ blk = fetchBlockAt(offset, 0, true);
}
return blk;
}
}
/** Fetch a block from namenode and cache it */
- protected void fetchBlockAt(long offset) throws IOException {
+ protected LocatedBlock fetchBlockAt(long offset) throws IOException {
+ return fetchBlockAt(offset, 0, false); // don't use cache
+ }
+
+ /** Fetch a block from namenode and cache it */
+ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
+ throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ useCache = false;
}
- // fetch blocks
- final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
- if (newBlocks == null) {
- throw new IOException("Could not find target position " + offset);
+ if (!useCache) { // fetch blocks
+ final LocatedBlocks newBlocks = (length == 0)
+ ? dfsClient.getLocatedBlocks(src, offset)
+ : dfsClient.getLocatedBlocks(src, offset, length);
+ if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
+ throw new EOFException("Could not find target position " + offset);
+ }
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ return locatedBlocks.get(targetBlockIdx);
}
}
@@ -502,28 +505,15 @@ public class DFSInputStream extends FSInputStream
assert (locatedBlocks != null) : "locatedBlocks is null";
List<LocatedBlock> blockRange = new ArrayList<>();
// search cached blocks first
- int blockIdx = locatedBlocks.findBlock(offset);
- if (blockIdx < 0) { // block is not cached
- blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
- }
long remaining = length;
long curOff = offset;
while(remaining > 0) {
- LocatedBlock blk = null;
- if(blockIdx < locatedBlocks.locatedBlockCount())
- blk = locatedBlocks.get(blockIdx);
- if (blk == null || curOff < blk.getStartOffset()) {
- LocatedBlocks newBlocks;
- newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
- locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
- continue;
- }
+ LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
assert curOff >= blk.getStartOffset() : "Block not found";
blockRange.add(blk);
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
remaining -= bytesRead;
curOff += bytesRead;
- blockIdx++;
}
return blockRange;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/07a5184f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index c761225..637f2df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
@@ -42,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -491,6 +494,54 @@ public class TestPread {
}
}
+ @Test
+ public void testTruncateWhileReading() throws Exception {
+ Path path = new Path("/testfile");
+ final int blockSize = 512;
+
+ // prevent initial pre-fetch of multiple block locations
+ Configuration conf = new Configuration();
+ conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, blockSize);
+
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // create multi-block file
+ FSDataOutputStream dos =
+ fs.create(path, true, blockSize, (short)1, blockSize);
+ dos.write(new byte[blockSize*3]);
+ dos.close();
+ // truncate a file while it's open
+ final FSDataInputStream dis = fs.open(path);
+ while (!fs.truncate(path, 10)) {
+ Thread.sleep(10);
+ }
+ // verify that reading bytes outside the initial pre-fetch do
+ // not send the client into an infinite loop querying locations.
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Future<?> future = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ // read from 2nd block.
+ dis.readFully(blockSize, new byte[4]);
+ return null;
+ }
+ });
+ try {
+ future.get(4, TimeUnit.SECONDS);
+ Assert.fail();
+ } catch (ExecutionException ee) {
+ assertTrue(ee.toString(), ee.getCause() instanceof EOFException);
+ } finally {
+ future.cancel(true);
+ executor.shutdown();
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
public static void main(String[] args) throws Exception {
new TestPread().testPreadDFS();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org