You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/07/08 22:13:27 UTC
svn commit: r1144480 - in /hadoop/common/trunk/hdfs: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSInputStream.java
src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
Author: szetszwo
Date: Fri Jul 8 20:13:26 2011
New Revision: 1144480
URL: http://svn.apache.org/viewvc?rev=1144480&view=rev
Log:
HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when reading only from a currently being written block. Contributed by John George
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1144480&r1=1144479&r2=1144480&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Fri Jul 8 20:13:26 2011
@@ -808,9 +808,13 @@ Trunk (unreleased changes)
HDFS-2053. Bug in INodeDirectory#computeContentSummary warning.
(Michael Noll via eli)
- HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara
+ HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara
Rao G via szetszwo)
+ HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when
+ reading only from a currently being written block. (John George via
+ szetszwo)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1144480&r1=1144479&r2=1144480&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jul 8 20:13:26 2011
@@ -294,8 +294,8 @@ public class DFSInputStream extends FSIn
/**
* Get blocks in the specified range.
- * Fetch them from the namenode if not cached.
- *
+ * Fetch them from the namenode if not cached. This function
+ * will not get a read request beyond the EOF.
* @param offset
* @param length
* @return consequent segment of located blocks
@@ -304,28 +304,31 @@ public class DFSInputStream extends FSIn
private synchronized List<LocatedBlock> getBlockRange(long offset,
long length)
throws IOException {
+ // getFileLength(): returns total file length
+ // locatedBlocks.getFileLength(): returns length of completed blocks
+ if (offset >= getFileLength()) {
+ throw new IOException("Offset: " + offset +
+ " exceeds file length: " + getFileLength());
+ }
+
final List<LocatedBlock> blocks;
- if (locatedBlocks.isLastBlockComplete()) {
- blocks = getFinalizedBlockRange(offset, length);
+ final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
+ final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
+ final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
+
+ if (readOffsetWithinCompleteBlk) {
+ //get the blocks of finalized (completed) block range
+ blocks = getFinalizedBlockRange(offset,
+ Math.min(length, lengthOfCompleteBlk - offset));
+ } else {
+ blocks = new ArrayList<LocatedBlock>(1);
}
- else {
- final boolean readPastEnd = offset + length > locatedBlocks.getFileLength();
- /* if requested length is greater than current file length
- * then, it could possibly be from the current block being
- * written to. First get the finalized block range and then
- * if necessary, get the length of last block being written
- * to.
- */
- if (readPastEnd)
- length = locatedBlocks.getFileLength() - offset;
-
- blocks = getFinalizedBlockRange(offset, length);
- /* requested length is greater than what finalized blocks
- * have.
- */
- if (readPastEnd)
- blocks.add(locatedBlocks.getLastLocatedBlock());
+
+ // get the blocks from incomplete block range
+ if (readLengthPastCompleteBlk) {
+ blocks.add(locatedBlocks.getLastLocatedBlock());
}
+
return blocks;
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java?rev=1144480&r1=1144479&r2=1144480&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java Fri Jul 8 20:13:26 2011
@@ -44,6 +44,7 @@ public class TestWriteRead {
private static final int BUFFER_SIZE = 8192 * 100;
private static final String ROOT_DIR = "/tmp/";
+ private static final long blockSize = 1024*100;
// command-line options. Different defaults for unit test vs real cluster
String filenameOption = ROOT_DIR + "fileX1";
@@ -69,8 +70,8 @@ public class TestWriteRead {
LOG.info("initJunitModeTest");
conf = new HdfsConfiguration();
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); // 100K
- // blocksize
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); // 100K
+ // blocksize
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
@@ -99,15 +100,14 @@ public class TestWriteRead {
}
/** Junit Test reading while writing. */
-
@Test
public void testWriteReadSeq() throws IOException {
useFCOption = false;
positionReadOption = false;
String fname = filenameOption;
-
+ long rdBeginPos = 0;
// need to run long enough to fail: takes 25 to 35 seec on Mac
- int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
+ int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos);
LOG.info("Summary status from test1: status= " + stat);
Assert.assertEquals(0, stat);
}
@@ -117,14 +117,27 @@ public class TestWriteRead {
public void testWriteReadPos() throws IOException {
String fname = filenameOption;
positionReadOption = true; // position read
- int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
+ long rdBeginPos = 0;
+ int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos);
Assert.assertEquals(0, stat);
}
+ /** Junit Test position read of the current block being written. */
+ @Test
+ public void testReadPosCurrentBlock() throws IOException {
+ String fname = filenameOption;
+ positionReadOption = true; // position read
+ int wrChunkSize = (int)(blockSize) + (int)(blockSize/2);
+ long rdBeginPos = blockSize+1;
+ int numTimes=5;
+ int stat = testWriteAndRead(fname, numTimes, wrChunkSize, rdBeginPos);
+ Assert.assertEquals(0, stat);
+ }
// equivalent of TestWriteRead1
private int clusterTestWriteRead1() throws IOException {
- int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption);
+ long rdBeginPos = 0;
+ int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption, rdBeginPos);
return stat;
}
@@ -133,10 +146,9 @@ public class TestWriteRead {
* Return number of bytes read.
* Support both sequential read and position read.
*/
- private long readData(String fname, byte[] buffer, long byteExpected)
+ private long readData(String fname, byte[] buffer, long byteExpected, long beginPosition)
throws IOException {
long totalByteRead = 0;
- long beginPosition = 0;
Path path = getFullyQualifiedPath(fname);
FSDataInputStream in = null;
@@ -263,7 +275,7 @@ public class TestWriteRead {
* After each iteration of write, do a read of the file from begin to end.
* Return 0 on success, else number of failure.
*/
- private int testWriteAndRead(String fname, int loopN, int chunkSize)
+ private int testWriteAndRead(String fname, int loopN, int chunkSize, long readBeginPosition)
throws IOException {
int countOfFailures = 0;
@@ -324,7 +336,7 @@ public class TestWriteRead {
+ ". TotalByteVisible = " + totalByteVisible + " to file "
+ fname);
}
- byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+ byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition);
String readmsg = "Written=" + totalByteWritten + " ; Expected Visible="
+ totalByteVisible + " ; Got Visible=" + byteVisibleToRead
@@ -353,7 +365,7 @@ public class TestWriteRead {
out.close();
- byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+ byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition);
String readmsg2 = "Written=" + totalByteWritten + " ; Expected Visible="
+ totalByteVisible + " ; Got Visible=" + byteVisibleToRead