You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ra...@apache.org on 2009/07/14 22:18:08 UTC

svn commit: r794054 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java

Author: rangadi
Date: Tue Jul 14 20:18:07 2009
New Revision: 794054

URL: http://svn.apache.org/viewvc?rev=794054&view=rev
Log:
HDFS-445. pread() does not pick up changes to block locations. (Kan Zhang via rangadi)

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jul 14 20:18:07 2009
@@ -77,6 +77,9 @@
     HDFS-489. Update TestHDFSCLI for the -skipTrash option in rm. (Jakob Homan
     via szetszwo)
 
+    HDFS-445. pread() does not pick up changes to block locations. 
+    (Kan Zhang via rangadi) 
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jul 14 20:18:07 2009
@@ -1506,10 +1506,12 @@
      * Fetch it from the namenode if not cached.
      * 
      * @param offset
+     * @param updatePosition whether to update current position
      * @return located block
      * @throws IOException
      */
-    private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
+    private synchronized LocatedBlock getBlockAt(long offset,
+        boolean updatePosition) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       // search cached blocks first
       int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1523,14 +1525,16 @@
       }
       LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
       // update current position
-      this.pos = offset;
-      this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
-      this.currentBlock = blk.getBlock();
+      if (updatePosition) {
+        this.pos = offset;
+        this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+        this.currentBlock = blk.getBlock();
+      }
       return blk;
     }
 
     /** Fetch a block from namenode and cache it */
-    private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
+    private synchronized void fetchBlockAt(long offset) throws IOException {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@@ -1544,17 +1548,6 @@
       locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
     }
 
-    /** Fetch a block without caching */
-    private LocatedBlock fetchBlockAt(long offset) throws IOException {
-      LocatedBlocks newBlocks;
-      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-      if (newBlocks == null) {
-        throw new IOException("Could not find target position " + offset);
-      }
-      int index = newBlocks.findBlock(offset);
-      return newBlocks.get(index);
-    }
-    
     /**
      * Get blocks in the specified range.
      * Fetch them from the namenode if not cached.
@@ -1624,7 +1617,7 @@
         //
         // Compute desired block
         //
-        LocatedBlock targetBlock = getBlockAt(target);
+        LocatedBlock targetBlock = getBlockAt(target, true);
         assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
         long offsetIntoBlock = target - targetBlock.getStartOffset();
 
@@ -1659,7 +1652,7 @@
              * access key from its memory since it's considered expired based on
              * the estimated expiration date.
              */
-            fetchAndCacheBlockAt(target);
+            fetchBlockAt(target);
           } else {
             // Put chosen node into dead list, continue
             addToDeadNodes(chosenNode);
@@ -1821,13 +1814,16 @@
           if (nodes == null || nodes.length == 0) {
             LOG.info("No node available for block: " + blockInfo);
           }
-          LOG.info("Could not obtain block " + block.getBlock() + " from any node:  " + ie);
+          LOG.info("Could not obtain block " + block.getBlock()
+              + " from any node: " + ie
+              + ". Will get new block locations from namenode and retry...");
           try {
             Thread.sleep(3000);
           } catch (InterruptedException iex) {
           }
           deadNodes.clear(); //2nd option is to remove only nodes[blockId]
           openInfo();
+          block = getBlockAt(block.getStartOffset(), false);
           failures++;
           continue;
         }
@@ -1840,11 +1836,13 @@
       // Connect to best DataNode for desired Block, with potential offset
       //
       Socket dn = null;
-      int numAttempts = block.getLocations().length;
-      IOException ioe = null;
       int refetchToken = 1; // only need to get a new access token once
       
-      while (dn == null && numAttempts-- > 0 ) {
+      while (true) {
+        // cached block locations may have been updated by chooseDataNode()
+        // or fetchBlockAt(). Always get the latest list of locations at the 
+        // start of the loop.
+        block = getBlockAt(block.getStartOffset(), false);
         DNAddrPair retval = chooseDataNode(block);
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
@@ -1871,21 +1869,18 @@
           }
           return;
         } catch (ChecksumException e) {
-          ioe = e;
           LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
                    src + " at " + block.getBlock() + ":" + 
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          ioe = e;
           if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
             LOG.info("Invalid access token when connecting to " + targetAddr
                 + " for file " + src + " for block "
                 + block.getBlock() + ":"
                 + StringUtils.stringifyException(e)
                 + ", get a new access token and retry...");
-            block = fetchBlockAt(block.getStartOffset());
-            numAttempts = block.getLocations().length;
+            fetchBlockAt(block.getStartOffset());
             continue;
           } else {
             LOG.warn("Failed to connect to " + targetAddr + " for file " + src
@@ -1895,12 +1890,10 @@
         } finally {
           IOUtils.closeStream(reader);
           IOUtils.closeSocket(dn);
-          dn = null;
         }
         // Put chosen node into dead list, continue
         addToDeadNodes(chosenNode);
       }
-      throw (ioe == null) ? new IOException("Could not read data") : ioe;
     }
 
     /**

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jul 14 20:18:07 2009
@@ -657,6 +657,18 @@
   }
 
   /*
+   * Restart all datanodes
+   */
+  public synchronized boolean restartDataNodes() throws IOException {
+    for (int i = dataNodes.size()-1; i >= 0; i--) {
+      System.out.println("Restarting DataNode " + i);
+      if (!restartDataNode(i)) 
+        return false;
+    }
+    return true;
+  }
+
+  /*
    * Shutdown a datanode by name.
    */
   public synchronized DataNodeProperties stopDataNode(String name) {
@@ -731,7 +743,7 @@
     while(client.datanodeReport(DatanodeReportType.LIVE).length
         != numDataNodes) {
       try {
-        Thread.sleep(500);
+        Thread.sleep(100);
       } catch (Exception e) {
       }
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java Tue Jul 14 20:18:07 2009
@@ -154,6 +154,37 @@
     
     stm.close();
   }
+    
+  // test pread can survive datanode restarts
+  private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
+      Path name) throws IOException {
+    // skip this test if using simulated storage since simulated blocks
+    // don't survive datanode restarts.
+    if (simulatedStorage) {
+      return;
+    }
+    int numBlocks = 1;
+    assertTrue(numBlocks <= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES);
+    byte[] expected = new byte[numBlocks * blockSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    byte[] actual = new byte[numBlocks * blockSize];
+    FSDataInputStream stm = fileSys.open(name);
+    // read a block and get block locations cached as a result
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
+    // restart all datanodes. it is expected that they will
+    // restart on different ports, hence, cached block locations
+    // will no longer work.
+    assertTrue(cluster.restartDataNodes());
+    cluster.waitActive();
+    // verify the block can be read again using the same InputStream 
+    // (via re-fetching of block locations from namenode). there is a 
+    // 3 sec sleep in chooseDataNode(), which can be shortened for 
+    // this test if configurable.
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
+  }
   
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
     assertTrue(fileSys.exists(name));
@@ -185,6 +216,7 @@
       Path file1 = new Path("preadtest.dat");
       writeFile(fileSys, file1);
       pReadFile(fileSys, file1);
+      datanodeRestartTest(cluster, fileSys, file1);
       cleanupFile(fileSys, file1);
     } finally {
       fileSys.close();
@@ -195,7 +227,7 @@
   public void testPreadDFSSimulated() throws IOException {
     simulatedStorage = true;
     testPreadDFS();
-    simulatedStorage = true;
+    simulatedStorage = false;
   }
   
   /**