You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:39:27 UTC

svn commit: r1077090 - in /hadoop/common/branches/branch-0.20-security-patches/src: hdfs/org/apache/hadoop/hdfs/DFSClient.java test/org/apache/hadoop/hdfs/MiniDFSCluster.java test/org/apache/hadoop/hdfs/TestPread.java

Author: omalley
Date: Fri Mar  4 03:39:26 2011
New Revision: 1077090

URL: http://svn.apache.org/viewvc?rev=1077090&view=rev
Log:
commit aae5dba8e7b545d5072e1f6e504b656fdb4d6e5f
Author: Jitendra Nath Pandey <ji...@yahoo-inc.com>
Date:   Wed Dec 23 16:39:18 2009 -0800

    HDFS-445 from https://issues.apache.org/jira/secure/attachment/12428885/HDFS-445-0_20.2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HDFS-445. pread() fails when cached block locations are no longer valid.
    +    (Jitendra Nath Pandey)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestPread.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077090&r1=1077089&r2=1077090&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 03:39:26 2011
@@ -1551,10 +1551,12 @@ public class DFSClient implements FSCons
      * Fetch it from the namenode if not cached.
      * 
      * @param offset
+     * @param updatePosition whether to update current position
      * @return located block
      * @throws IOException
      */
-    private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
+    private synchronized LocatedBlock getBlockAt(long offset,
+        boolean updatePosition) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       // search cached blocks first
       int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1568,14 +1570,16 @@ public class DFSClient implements FSCons
       }
       LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
       // update current position
-      this.pos = offset;
-      this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
-      this.currentBlock = blk.getBlock();
+      if (updatePosition) {
+        this.pos = offset;
+        this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+        this.currentBlock = blk.getBlock();
+      }
       return blk;
     }
 
     /** Fetch a block from namenode and cache it */
-    private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
+    private synchronized void fetchBlockAt(long offset) throws IOException {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@@ -1589,17 +1593,6 @@ public class DFSClient implements FSCons
       locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
     }
 
-    /** Fetch a block without caching */
-    private LocatedBlock fetchBlockAt(long offset) throws IOException {
-      LocatedBlocks newBlocks;
-      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-      if (newBlocks == null) {
-        throw new IOException("Could not find target position " + offset);
-      }
-      int index = newBlocks.findBlock(offset);
-      return newBlocks.get(index);
-    }
-    
     /**
      * Get blocks in the specified range.
      * Fetch them from the namenode if not cached.
@@ -1669,7 +1662,7 @@ public class DFSClient implements FSCons
         //
         // Compute desired block
         //
-        LocatedBlock targetBlock = getBlockAt(target);
+        LocatedBlock targetBlock = getBlockAt(target, true);
         assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
         long offsetIntoBlock = target - targetBlock.getStartOffset();
 
@@ -1704,7 +1697,7 @@ public class DFSClient implements FSCons
              * access key from its memory since it's considered expired based on
              * the estimated expiration date.
              */
-            fetchAndCacheBlockAt(target);
+            fetchBlockAt(target);
           } else {
             // Put chosen node into dead list, continue
             addToDeadNodes(chosenNode);
@@ -1866,13 +1859,16 @@ public class DFSClient implements FSCons
           if (nodes == null || nodes.length == 0) {
             LOG.info("No node available for block: " + blockInfo);
           }
-          LOG.info("Could not obtain block " + block.getBlock() + " from any node:  " + ie);
+          LOG.info("Could not obtain block " + block.getBlock()
+              + " from any node: " + ie
+              + ". Will get new block locations from namenode and retry...");
           try {
             Thread.sleep(3000);
           } catch (InterruptedException iex) {
           }
           deadNodes.clear(); //2nd option is to remove only nodes[blockId]
           openInfo();
+          block = getBlockAt(block.getStartOffset(), false);
           failures++;
           continue;
         }
@@ -1885,11 +1881,13 @@ public class DFSClient implements FSCons
       // Connect to best DataNode for desired Block, with potential offset
       //
       Socket dn = null;
-      int numAttempts = block.getLocations().length;
-      IOException ioe = null;
       int refetchToken = 1; // only need to get a new access token once
       
-      while (dn == null && numAttempts-- > 0 ) {
+      while (true) {
+        // cached block locations may have been updated by chooseDataNode()
+        // or fetchBlockAt(). Always get the latest list of locations at the 
+        // start of the loop.
+        block = getBlockAt(block.getStartOffset(), false);
         DNAddrPair retval = chooseDataNode(block);
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
@@ -1916,21 +1914,18 @@ public class DFSClient implements FSCons
           }
           return;
         } catch (ChecksumException e) {
-          ioe = e;
           LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
                    src + " at " + block.getBlock() + ":" + 
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          ioe = e;
           if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
             LOG.info("Invalid access token when connecting to " + targetAddr
                 + " for file " + src + " for block "
                 + block.getBlock() + ":"
                 + StringUtils.stringifyException(e)
                 + ", get a new access token and retry...");
-            block = fetchBlockAt(block.getStartOffset());
-            numAttempts = block.getLocations().length;
+            fetchBlockAt(block.getStartOffset());
             continue;
           } else {
             LOG.warn("Failed to connect to " + targetAddr + " for file " + src
@@ -1940,12 +1935,10 @@ public class DFSClient implements FSCons
         } finally {
           IOUtils.closeStream(reader);
           IOUtils.closeSocket(dn);
-          dn = null;
         }
         // Put chosen node into dead list, continue
         addToDeadNodes(chosenNode);
       }
-      throw (ioe == null) ? new IOException("Could not read data") : ioe;
     }
 
     /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1077090&r1=1077089&r2=1077090&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Mar  4 03:39:26 2011
@@ -648,6 +648,18 @@ public class MiniDFSCluster {
   }
 
   /*
+   * Restart all datanodes
+   */
+  public synchronized boolean restartDataNodes() throws IOException {
+    for (int i = dataNodes.size()-1; i >= 0; i--) {
+      System.out.println("Restarting DataNode " + i);
+      if (!restartDataNode(i)) 
+        return false;
+    }
+    return true;
+  }
+
+  /*
    * Shutdown a datanode by name.
    */
   public synchronized DataNodeProperties stopDataNode(String name) {
@@ -726,7 +738,7 @@ public class MiniDFSCluster {
     while(client.datanodeReport(DatanodeReportType.LIVE).length
         != numDataNodes) {
       try {
-        Thread.sleep(500);
+        Thread.sleep(100);
       } catch (Exception e) {
       }
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestPread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestPread.java?rev=1077090&r1=1077089&r2=1077090&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestPread.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestPread.java Fri Mar  4 03:39:26 2011
@@ -151,6 +151,37 @@ public class TestPread extends TestCase 
     
     stm.close();
   }
+    
+  // test pread can survive datanode restarts
+  private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
+      Path name) throws IOException {
+    // skip this test if using simulated storage since simulated blocks
+    // don't survive datanode restarts.
+    if (simulatedStorage) {
+      return;
+    }
+    int numBlocks = 1;
+    assertTrue(numBlocks <= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES);
+    byte[] expected = new byte[numBlocks * blockSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    byte[] actual = new byte[numBlocks * blockSize];
+    FSDataInputStream stm = fileSys.open(name);
+    // read a block and get block locations cached as a result
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
+    // restart all datanodes. it is expected that they will
+    // restart on different ports, hence, cached block locations
+    // will no longer work.
+    assertTrue(cluster.restartDataNodes());
+    cluster.waitActive();
+    // verify the block can be read again using the same InputStream 
+    // (via re-fetching of block locations from namenode). there is a 
+    // 3 sec sleep in chooseDataNode(), which can be shortened for 
+    // this test if configurable.
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
+  }
   
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
     assertTrue(fileSys.exists(name));
@@ -182,6 +213,7 @@ public class TestPread extends TestCase 
       Path file1 = new Path("preadtest.dat");
       writeFile(fileSys, file1);
       pReadFile(fileSys, file1);
+      datanodeRestartTest(cluster, fileSys, file1);
       cleanupFile(fileSys, file1);
     } finally {
       fileSys.close();
@@ -192,7 +224,7 @@ public class TestPread extends TestCase 
   public void testPreadDFSSimulated() throws IOException {
     simulatedStorage = true;
     testPreadDFS();
-    simulatedStorage = true;
+    simulatedStorage = false;
   }
   
   /**