You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/10/17 22:30:44 UTC

svn commit: r705744 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/

Author: szetszwo
Date: Fri Oct 17 13:30:43 2008
New Revision: 705744

URL: http://svn.apache.org/viewvc?rev=705744&view=rev
Log:
HADOOP-4423. Keep block length when the block recovery is triggered by append.  (szetszwo)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Oct 17 13:30:43 2008
@@ -937,9 +937,6 @@
     HADOOP-4426. TestCapacityScheduler broke due to the two commits HADOOP-4053
     and HADOOP-4373. This patch fixes that. (Hemanth Yamijala via ddas)
 
-    HADOOP-4278. Increase debug logging for unit test TestDatanodeDeath.
-    (dhruba)
-
     HADOOP-4418. Updates documentation in forrest for Mapred, streaming and pipes.
     (Amareshwari Sriramadasu via ddas)
 
@@ -953,8 +950,11 @@
     HADOOP-4427. Adds the new queue/job commands to the manual.
     (Sreekanth Ramakrishnan via ddas)
 
-    HADOOP-4278. If the primary datanode fails in DFSClent, remove it from
-    the pipe line.  (dhruba via szetszwo)
+    HADOOP-4278. Increase debug logging for unit test TestDatanodeDeath.
+    Fix the case when primary is dead.  (dhruba via szetszwo)
+
+    HADOOP-4423. Keep block length when the block recovery is triggered by
+    append.  (szetszwo)
 
 Release 0.18.2 - Unreleased
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Oct 17 13:30:43 2008
@@ -2146,7 +2146,7 @@
           synchronized (dataQueue) {
 
             // process IO errors if any
-            boolean doSleep = processDatanodeError(hasError);
+            boolean doSleep = processDatanodeError(hasError, false);
 
             // wait for a packet to be sent.
             while ((!closed && !hasError && clientRunning 
@@ -2371,7 +2371,7 @@
     // threads and mark stream as closed. Returns true if we should
     // sleep for a while after returning from this call.
     //
-    private boolean processDatanodeError(boolean hasError) {
+    private boolean processDatanodeError(boolean hasError, boolean isAppend) {
       if (!hasError) {
         return false;
       }
@@ -2453,7 +2453,7 @@
           // Pick the "least" datanode as the primary datanode to avoid deadlock.
           primaryNode = Collections.min(Arrays.asList(newnodes));
           primary = createClientDatanodeProtocolProxy(primaryNode, conf);
-          newBlock = primary.recoverBlock(block, newnodes);
+          newBlock = primary.recoverBlock(block, isAppend, newnodes);
         } catch (IOException e) {
           recoveryErrorCount++;
           if (recoveryErrorCount > maxRecoveryErrorCount) {
@@ -2640,7 +2640,7 @@
                                 "of file " + src);
                         
         }
-        processDatanodeError(true);
+        processDatanodeError(true, true);
         streamer.start();
       }
       else {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Oct 17 13:30:43 2008
@@ -29,17 +29,19 @@
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 2: recoverBlock returns the datanodes on which recovery was successful.
+   * 3: add keepLength parameter.
    */
-  public static final long versionID = 2L;
+  public static final long versionID = 3L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
+   * @param keepLength keep the block length
    * @param targets the list of possible locations of specified block
    * @return the new blockid if recovery successful and the generation stamp
    * got updated as part of the recovery, else returns null if the block id
    * not have any data and the block was deleted.
    * @throws IOException
    */
-  LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
+  LocatedBlock recoverBlock(Block block, boolean keepLength,
+      DatanodeInfo[] targets) throws IOException;
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 17 13:30:43 2008
@@ -1323,7 +1323,7 @@
         for(int i = 0; i < blocks.length; i++) {
           try {
             logRecoverBlock("NameNode", blocks[i], targets[i]);
-            recoverBlock(blocks[i], targets[i], true);
+            recoverBlock(blocks[i], false, targets[i], true);
           } catch (IOException e) {
             LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
           }
@@ -1336,8 +1336,9 @@
 
   /** {@inheritDoc} */
   public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
-    LOG.info("oldblock=" + oldblock + ", newblock=" + newblock
-        + ", datanode=" + dnRegistration.getName());
+    LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes()
+        + "), newblock=" + newblock + "(length=" + newblock.getNumBytes()
+        + "), datanode=" + dnRegistration.getName());
     data.updateBlock(oldblock, newblock);
     if (finalize) {
       data.finalizeBlock(newblock);
@@ -1380,8 +1381,8 @@
   }
 
   /** Recover a block */
-  private LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
-      boolean closeFile) throws IOException {
+  private LocatedBlock recoverBlock(Block block, boolean keepLength,
+      DatanodeID[] datanodeids, boolean closeFile) throws IOException {
 
     // If the block is already being recovered, then skip recovering it.
     // This can happen if the namenode and client start recovering the same
@@ -1409,9 +1410,16 @@
               this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
           BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
           if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
-            syncList.add(new BlockRecord(id, datanode, new Block(info)));
-            if (info.getNumBytes() < minlength) {
-              minlength = info.getNumBytes();
+            if (keepLength) {
+              if (info.getNumBytes() == block.getNumBytes()) {
+                syncList.add(new BlockRecord(id, datanode, new Block(info)));
+              }
+            }
+            else {
+              syncList.add(new BlockRecord(id, datanode, new Block(info)));
+              if (info.getNumBytes() < minlength) {
+                minlength = info.getNumBytes();
+              }
             }
           }
         } catch (IOException e) {
@@ -1426,7 +1434,10 @@
         throw new IOException("All datanodes failed: block=" + block
             + ", datanodeids=" + Arrays.asList(datanodeids));
       }
-      return syncBlock(block, minlength, syncList, closeFile);
+      if (!keepLength) {
+        block.setNumBytes(minlength);
+      }
+      return syncBlock(block, syncList, closeFile);
     } finally {
       synchronized (ongoingRecovery) {
         ongoingRecovery.remove(block);
@@ -1435,11 +1446,11 @@
   }
 
   /** Block synchronization */
-  private LocatedBlock syncBlock(Block block, long minlength,
-      List<BlockRecord> syncList, boolean closeFile) throws IOException {
+  private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
+      boolean closeFile) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", minlength=" + minlength
-          + ", syncList=" + syncList + ", closeFile=" + closeFile);
+      LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+          + "), syncList=" + syncList + ", closeFile=" + closeFile);
     }
 
     //syncList.isEmpty() that all datanodes do not have the block
@@ -1453,7 +1464,7 @@
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
 
     long generationstamp = namenode.nextGenerationStamp(block);
-    Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
+    Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
 
     for(BlockRecord r : syncList) {
       try {
@@ -1489,10 +1500,10 @@
   
   // ClientDataNodeProtocol implementation
   /** {@inheritDoc} */
-  public LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets
+  public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
       ) throws IOException {
     logRecoverBlock("Client", block, targets);
-    return recoverBlock(block, targets, false);
+    return recoverBlock(block, keepLength, targets, false);
   }
 
   private static void logRecoverBlock(String who,

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Oct 17 13:30:43 2008
@@ -1174,9 +1174,7 @@
       f = tmp;
     }
     if (f == null) {
-      throw new IOException("Block " + b + 
-                            " block file " + f +
-                            " does not exist on disk.");
+      throw new IOException("Block " + b + " does not exist on disk.");
     }
     if (!f.exists()) {
       throw new IOException("Block " + b + 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java Fri Oct 17 13:30:43 2008
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
 import junit.extensions.TestSetup;
 import junit.framework.Test;
@@ -31,6 +32,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
 
 /** This class implements some of tests posted in HADOOP-2658. */
@@ -139,6 +141,51 @@
     out.close();        
   }
 
+  /** TC7: Corrupted replicas are present. */
+  public void testTC7() throws Exception {
+    final short repl = 2;
+    final Path p = new Path("/TC7/foo");
+    System.out.println("p=" + p);
+    
+    //a. Create file with replication factor of 2. Write half block of data. Close file.
+    final int len1 = (int)(BLOCK_SIZE/2); 
+    {
+      FSDataOutputStream out = fs.create(p, false, buffersize, repl, BLOCK_SIZE);
+      AppendTestUtil.write(out, 0, len1);
+      out.close();
+    }
+    DFSTestUtil.waitReplication(fs, p, repl);
+
+    //b. Log into one datanode that has one replica of this block.
+    //   Find the block file on this datanode and truncate it to zero size.
+    final LocatedBlocks locatedblocks = fs.dfs.namenode.getBlockLocations(p.toString(), 0L, len1);
+    assertEquals(1, locatedblocks.locatedBlockCount());
+    final LocatedBlock lb = locatedblocks.get(0);
+    final Block blk = lb.getBlock();
+    assertEquals(len1, lb.getBlockSize());
+
+    DatanodeInfo[] datanodeinfos = lb.getLocations();
+    assertEquals(repl, datanodeinfos.length);
+    final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+    final FSDataset data = (FSDataset)dn.getFSDataset();
+    final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw");
+    AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
+    assertEquals(len1, raf.length());
+    raf.setLength(0);
+    raf.close();
+
+    //c. Open file in "append mode".  Append a new block worth of data. Close file.
+    final int len2 = (int)BLOCK_SIZE; 
+    {
+      FSDataOutputStream out = fs.append(p);
+      AppendTestUtil.write(out, len1, len2);
+      out.close();
+    }
+
+    //d. Reopen file and read two blocks worth of data.
+    AppendTestUtil.check(fs, p, len1 + len2);
+  }
+
   /** TC11: Racing rename */
   public void testTC11() throws Exception {
     final Path p = new Path("/TC11/foo");