You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/07/08 22:13:27 UTC

svn commit: r1144480 - in /hadoop/common/trunk/hdfs: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSInputStream.java src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java

Author: szetszwo
Date: Fri Jul  8 20:13:26 2011
New Revision: 1144480

URL: http://svn.apache.org/viewvc?rev=1144480&view=rev
Log:
HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when reading only from a currently being written block. Contributed by John George

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1144480&r1=1144479&r2=1144480&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Fri Jul  8 20:13:26 2011
@@ -808,9 +808,13 @@ Trunk (unreleased changes)
     HDFS-2053. Bug in INodeDirectory#computeContentSummary warning.
     (Michael Noll via eli)
 
-    HDFS-1990. Fix  resource leaks in BlockReceiver.close().  (Uma Maheswara
+    HDFS-1990. Fix resource leaks in BlockReceiver.close().  (Uma Maheswara
     Rao G via szetszwo)
 
+    HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when
+    reading only from a currently being written block. (John George via
+    szetszwo)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1144480&r1=1144479&r2=1144480&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jul  8 20:13:26 2011
@@ -294,8 +294,8 @@ public class DFSInputStream extends FSIn
 
   /**
    * Get blocks in the specified range.
-   * Fetch them from the namenode if not cached.
-   * 
+   * Fetch them from the namenode if not cached. This function
+   * will not get a read request beyond the EOF.
    * @param offset
    * @param length
    * @return consequent segment of located blocks
@@ -304,28 +304,31 @@ public class DFSInputStream extends FSIn
   private synchronized List<LocatedBlock> getBlockRange(long offset, 
                                                         long length) 
                                                       throws IOException {
+    // getFileLength(): returns total file length
+    // locatedBlocks.getFileLength(): returns length of completed blocks
+    if (offset >= getFileLength()) {
+      throw new IOException("Offset: " + offset +
+        " exceeds file length: " + getFileLength());
+    }
+
     final List<LocatedBlock> blocks;
-    if (locatedBlocks.isLastBlockComplete()) {
-      blocks = getFinalizedBlockRange(offset, length);
+    final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
+    final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
+    final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
+
+    if (readOffsetWithinCompleteBlk) {
+      //get the blocks of finalized (completed) block range
+      blocks = getFinalizedBlockRange(offset, 
+        Math.min(length, lengthOfCompleteBlk - offset));
+    } else {
+      blocks = new ArrayList<LocatedBlock>(1);
     }
-    else {
-      final boolean readPastEnd = offset + length > locatedBlocks.getFileLength();
-      /* if requested length is greater than current file length
-       * then, it could possibly be from the current block being
-       * written to. First get the finalized block range and then
-       * if necessary, get the length of last block being written
-       * to.
-       */
-      if (readPastEnd)
-        length = locatedBlocks.getFileLength() - offset;
-
-      blocks = getFinalizedBlockRange(offset, length);
-      /* requested length is greater than what finalized blocks 
-       * have.
-       */
-      if (readPastEnd)
-        blocks.add(locatedBlocks.getLastLocatedBlock());
+
+    // get the blocks from incomplete block range
+    if (readLengthPastCompleteBlk) {
+       blocks.add(locatedBlocks.getLastLocatedBlock());
     }
+
     return blocks;
   }
 

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java?rev=1144480&r1=1144479&r2=1144480&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java Fri Jul  8 20:13:26 2011
@@ -44,6 +44,7 @@ public class TestWriteRead {
 
   private static final int BUFFER_SIZE = 8192 * 100;
   private static final String ROOT_DIR = "/tmp/";
+  private static final long blockSize = 1024*100;
 
   // command-line options. Different defaults for unit test vs real cluster
   String filenameOption = ROOT_DIR + "fileX1";
@@ -69,8 +70,8 @@ public class TestWriteRead {
     LOG.info("initJunitModeTest");
 
     conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); // 100K
-                                                                // blocksize
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); // 100K
+                                                              // blocksize
 
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
@@ -99,15 +100,14 @@ public class TestWriteRead {
   }
 
   /** Junit Test reading while writing. */
-  
   @Test
   public void testWriteReadSeq() throws IOException {
     useFCOption = false; 
     positionReadOption = false;
     String fname = filenameOption;
-    
+    long rdBeginPos = 0;
     // need to run long enough to fail: takes 25 to 35 seec on Mac
-    int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
+    int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos);
     LOG.info("Summary status from test1: status= " + stat);
     Assert.assertEquals(0, stat);
   }
@@ -117,14 +117,27 @@ public class TestWriteRead {
   public void testWriteReadPos() throws IOException {
     String fname = filenameOption;
     positionReadOption = true;   // position read
-    int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
+    long rdBeginPos = 0;
+    int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos);
     Assert.assertEquals(0, stat);
   }
 
+  /** Junit Test position read of the current block being written. */
+  @Test
+  public void testReadPosCurrentBlock() throws IOException {
+    String fname = filenameOption;
+    positionReadOption = true;   // position read
+    int wrChunkSize = (int)(blockSize) + (int)(blockSize/2);
+    long rdBeginPos = blockSize+1;
+    int numTimes=5;
+    int stat = testWriteAndRead(fname, numTimes, wrChunkSize, rdBeginPos);
+    Assert.assertEquals(0, stat);
+  }
    
   // equivalent of TestWriteRead1
   private int clusterTestWriteRead1() throws IOException {
-    int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption);
+    long rdBeginPos = 0;
+    int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption, rdBeginPos);
     return stat;
   }
 
@@ -133,10 +146,9 @@ public class TestWriteRead {
    * Return number of bytes read. 
    * Support both sequential read and position read.
    */
-  private long readData(String fname, byte[] buffer, long byteExpected)
+  private long readData(String fname, byte[] buffer, long byteExpected, long beginPosition)
       throws IOException {
     long totalByteRead = 0;
-    long beginPosition = 0;
     Path path = getFullyQualifiedPath(fname);
 
     FSDataInputStream in = null;
@@ -263,7 +275,7 @@ public class TestWriteRead {
    * After each iteration of write, do a read of the file from begin to end. 
    * Return 0 on success, else number of failure.
    */
-  private int testWriteAndRead(String fname, int loopN, int chunkSize)
+  private int testWriteAndRead(String fname, int loopN, int chunkSize, long readBeginPosition)
       throws IOException {
 
     int countOfFailures = 0;
@@ -324,7 +336,7 @@ public class TestWriteRead {
               + ". TotalByteVisible = " + totalByteVisible + " to file "
               + fname);
         }
-        byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+        byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition);
 
         String readmsg = "Written=" + totalByteWritten + " ; Expected Visible="
             + totalByteVisible + " ; Got Visible=" + byteVisibleToRead
@@ -353,7 +365,7 @@ public class TestWriteRead {
 
       out.close();
 
-      byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+      byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition);
 
       String readmsg2 = "Written=" + totalByteWritten + " ; Expected Visible="
           + totalByteVisible + " ; Got Visible=" + byteVisibleToRead