You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/06/16 22:53:12 UTC

svn commit: r955377 - in /hadoop/common/branches/branch-0.20-append: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java src/test/org/apache/hadoop/hdfs/TestPread.java

Author: dhruba
Date: Wed Jun 16 20:53:12 2010
New Revision: 955377

URL: http://svn.apache.org/viewvc?rev=955377&view=rev
Log:
HDFS-445. pread should refetch block locations when necessary.
(Todd Lipcon via dhruba)


Modified:
    hadoop/common/branches/branch-0.20-append/CHANGES.txt
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java

Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Wed Jun 16 20:53:12 2010
@@ -28,6 +28,9 @@ Release 0.20-append - Unreleased
     HDFS-1054. remove sleep before retry for allocating a block.
     (Todd Lipcon via dhruba)
 
+    HDFS-445. pread should refetch block locations when necessary.
+    (Todd Lipcon via dhruba)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Wed Jun 16 20:53:12 2010
@@ -1584,10 +1584,12 @@ public class DFSClient implements FSCons
      * Fetch it from the namenode if not cached.
      * 
      * @param offset
+     * @param updatePosition whether to update current position
      * @return located block
      * @throws IOException
      */
-    private LocatedBlock getBlockAt(long offset) throws IOException {
+    private synchronized LocatedBlock getBlockAt(long offset,
+        boolean updatePosition) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       // search cached blocks first
       int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1601,9 +1603,11 @@ public class DFSClient implements FSCons
       }
       LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
       // update current position
-      this.pos = offset;
-      this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
-      this.currentBlock = blk.getBlock();
+      if (updatePosition) {
+        this.pos = offset;
+        this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+        this.currentBlock = blk.getBlock();
+      }
       return blk;
     }
 
@@ -1670,7 +1674,7 @@ public class DFSClient implements FSCons
       //
       // Compute desired block
       //
-      LocatedBlock targetBlock = getBlockAt(target);
+      LocatedBlock targetBlock = getBlockAt(target, true);
       assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
       long offsetIntoBlock = target - targetBlock.getStartOffset();
 
@@ -1859,13 +1863,16 @@ public class DFSClient implements FSCons
           if (nodes == null || nodes.length == 0) {
             LOG.info("No node available for block: " + blockInfo);
           }
-          LOG.info("Could not obtain block " + block.getBlock() + " from any node:  " + ie);
+          LOG.info("Could not obtain block " + block.getBlock()
+              + " from any node: " + ie
+              + ". Will get new block locations from namenode and retry...");
           try {
             Thread.sleep(3000);
           } catch (InterruptedException iex) {
           }
           deadNodes.clear(); //2nd option is to remove only nodes[blockId]
           openInfo();
+          block = getBlockAt(block.getStartOffset(), false);
           failures++;
           continue;
         }
@@ -1878,10 +1885,13 @@ public class DFSClient implements FSCons
       // Connect to best DataNode for desired Block, with potential offset
       //
       Socket dn = null;
-      int numAttempts = block.getLocations().length;
-      IOException ioe = null;
-      
-      while (dn == null && numAttempts-- > 0 ) {
+            
+      while (true) {
+        // cached block locations may have been updated by chooseDataNode()
+        // or fetchBlockAt(). Always get the latest list of locations at the 
+        // start of the loop.
+        block = getBlockAt(block.getStartOffset(), false);
+
         DNAddrPair retval = chooseDataNode(block);
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
@@ -1906,13 +1916,11 @@ public class DFSClient implements FSCons
           }
           return;
         } catch (ChecksumException e) {
-          ioe = e;
           LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
                    src + " at " + block.getBlock() + ":" + 
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          ioe = e;
           LOG.warn("Failed to connect to " + targetAddr + 
                    " for file " + src + 
                    " for block " + block.getBlock().getBlockId() + ":"  +
@@ -1920,12 +1928,10 @@ public class DFSClient implements FSCons
         } finally {
           IOUtils.closeStream(reader);
           IOUtils.closeSocket(dn);
-          dn = null;
         }
         // Put chosen node into dead list, continue
         addToDeadNodes(chosenNode);
       }
-      throw (ioe == null) ? new IOException("Could not read data") : ioe;
     }
 
     /**

Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Jun 16 20:53:12 2010
@@ -652,6 +652,18 @@ public class MiniDFSCluster {
   }
 
   /*
+   * Restart all datanodes
+   */
+  public synchronized boolean restartDataNodes() throws IOException {
+    for (int i = dataNodes.size()-1; i >= 0; i--) {
+      System.out.println("Restarting DataNode " + i);
+      if (!restartDataNode(i)) 
+        return false;
+    }
+    return true;
+  }
+
+  /*
    * Shutdown a datanode by name.
    */
   public synchronized DataNodeProperties stopDataNode(String name) {

Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java Wed Jun 16 20:53:12 2010
@@ -151,6 +151,37 @@ public class TestPread extends TestCase 
     
     stm.close();
   }
+    
+  // test pread can survive datanode restarts
+  private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
+      Path name) throws IOException {
+    // skip this test if using simulated storage since simulated blocks
+    // don't survive datanode restarts.
+    if (simulatedStorage) {
+      return;
+    }
+    int numBlocks = 1;
+    assertTrue(numBlocks <= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES);
+    byte[] expected = new byte[numBlocks * blockSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    byte[] actual = new byte[numBlocks * blockSize];
+    FSDataInputStream stm = fileSys.open(name);
+    // read a block and get block locations cached as a result
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
+    // restart all datanodes. it is expected that they will
+    // restart on different ports, hence, cached block locations
+    // will no longer work.
+    assertTrue(cluster.restartDataNodes());
+    cluster.waitActive();
+    // verify the block can be read again using the same InputStream 
+    // (via re-fetching of block locations from namenode). there is a 
+    // 3 sec sleep in chooseDataNode(), which can be shortened for 
+    // this test if configurable.
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
+  }
   
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
     assertTrue(fileSys.exists(name));
@@ -182,6 +213,7 @@ public class TestPread extends TestCase 
       Path file1 = new Path("preadtest.dat");
       writeFile(fileSys, file1);
       pReadFile(fileSys, file1);
+      datanodeRestartTest(cluster, fileSys, file1);
       cleanupFile(fileSys, file1);
     } finally {
       fileSys.close();
@@ -192,7 +224,7 @@ public class TestPread extends TestCase 
   public void testPreadDFSSimulated() throws IOException {
     simulatedStorage = true;
     testPreadDFS();
-    simulatedStorage = true;
+    simulatedStorage = false;
   }
   
   /**