You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/10 02:06:42 UTC

svn commit: r1167409 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/...

Author: suresh
Date: Sat Sep 10 00:06:41 2011
New Revision: 1167409

URL: http://svn.apache.org/viewvc?rev=1167409&view=rev
Log:
HDFS-1218. Blocks recovered on startup should be treated with lower priority during block synchronization. Contributed by Todd Lipcon.


Added:
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep 10 00:06:41 2011
@@ -116,6 +116,9 @@ Release 0.20.205.0 - unreleased
     HDFS-1197. Blocks are considered "complete" prematurely after 
     commitBlockSynchronization or DN restart. (Todd Lipcon via jitendra)
 
+    HDFS-1218. Blocks recovered on startup should be treated with lower 
+    priority during block synchronization. (Todd Lipcon via suresh)
+
   IMPROVEMENTS
 
     MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Sat Sep 10 00:06:41 2011
@@ -2748,6 +2748,8 @@ public class DFSClient implements FSCons
 
           // This is used by unit test to trigger race conditions.
           if (artificialSlowdown != 0 && clientRunning) {
+            LOG.debug("Sleeping for artificial slowdown of " +
+                artificialSlowdown + "ms");
             try { 
               Thread.sleep(artificialSlowdown); 
             } catch (InterruptedException e) {}
@@ -3641,6 +3643,16 @@ public class DFSClient implements FSCons
         s = null;
       }
     }
+    
+    /**
+     * Harsh abort method that should only be used from tests - this
+     * is in order to prevent pipeline recovery when eg a DN shuts down.
+     */
+    void abortForTests() throws IOException {
+      streamer.close();
+      response.close();
+      closed = true;
+    }
  
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java Sat Sep 10 00:06:41 2011
@@ -153,6 +153,7 @@ public class DatanodeID implements Writa
   public void updateRegInfo(DatanodeID nodeReg) {
     name = nodeReg.getName();
     infoPort = nodeReg.getInfoPort();
+    ipcPort = nodeReg.getIpcPort();
     // update any more fields added in future.
   }
     

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Sep 10 00:06:41 2011
@@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1638,6 +1639,7 @@ public class DataNode extends Configured
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block);
     }
+    
     Block stored = data.getStoredBlock(block.getBlockId());
 
     if (stored == null) {
@@ -1656,6 +1658,11 @@ public class DataNode extends Configured
     data.validateBlockMetadata(stored);
     return info;
   }
+  
+  @Override
+  public BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException {
+    return data.getBlockRecoveryInfo(block.getBlockId());
+  }
 
   public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
     Daemon d = new Daemon(threadGroup, new Runnable() {
@@ -1707,21 +1714,26 @@ public class DataNode extends Configured
   private static class BlockRecord { 
     final DatanodeID id;
     final InterDatanodeProtocol datanode;
-    final Block block;
+    final BlockRecoveryInfo info;
     
-    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode,
+        BlockRecoveryInfo info) {
       this.id = id;
       this.datanode = datanode;
-      this.block = block;
+      this.info = info;
     }
 
     /** {@inheritDoc} */
     public String toString() {
-      return "block:" + block + " node:" + id;
+      return "BlockRecord(info=" + info + " node=" + id + ")";
     }
   }
 
-  /** Recover a block */
+  /** Recover a block
+   * @param keepLength if true, will only recover replicas that have the same length
+   * as the block passed in. Otherwise, will calculate the minimum length of the
+   * replicas and truncate the rest to that length.
+   **/
   private LocatedBlock recoverBlock(Block block, boolean keepLength,
       DatanodeInfo[] targets, boolean closeFile) throws IOException {
 
@@ -1741,28 +1753,40 @@ public class DataNode extends Configured
       ongoingRecovery.put(block, block);
     }
     try {
-      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
-      long minlength = Long.MAX_VALUE;
       int errorCount = 0;
 
-      //check generation stamps
+      // Number of "replicasBeingWritten" in 0.21 parlance - these are replicas
+      // on DNs that are still alive from when the write was happening
+      int rbwCount = 0;
+      // Number of "replicasWaitingRecovery" in 0.21 parlance - these replicas
+      // have survived a DN restart, and thus might be truncated (eg if the
+      // DN died because of a machine power failure, and when the ext3 journal
+      // replayed, it truncated the file
+      int rwrCount = 0;
+      
+      List<BlockRecord> blockRecords = new ArrayList<BlockRecord>();
       for(DatanodeID id : datanodeids) {
         try {
           InterDatanodeProtocol datanode = dnRegistration.equals(id)?
               this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
-          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
-          if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
-            if (keepLength) {
-              if (info.getNumBytes() == block.getNumBytes()) {
-                syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              }
-            }
-            else {
-              syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              if (info.getNumBytes() < minlength) {
-                minlength = info.getNumBytes();
-              }
-            }
+          BlockRecoveryInfo info = datanode.getBlockRecoveryInfo(block);
+          if (info == null) {
+            LOG.info("No block metadata found for block " + block + " on datanode "
+                + id);
+            continue;
+          }
+          if (info.getBlock().getGenerationStamp() < block.getGenerationStamp()) {
+            LOG.info("Only old generation stamp " + info.getBlock().getGenerationStamp()
+                + " found on datanode " + id + " (needed block=" +
+                block + ")");
+            continue;
+          }
+          blockRecords.add(new BlockRecord(id, datanode, info));
+
+          if (info.wasRecoveredOnStartup()) {
+            rwrCount++;
+          } else {
+            rbwCount++;
           }
         } catch (IOException e) {
           ++errorCount;
@@ -1772,6 +1796,34 @@ public class DataNode extends Configured
         }
       }
 
+      // If we *only* have replicas from post-DN-restart, then we should
+      // include them in determining length. Otherwise they might cause us
+      // to truncate too short.
+      boolean shouldRecoverRwrs = (rbwCount == 0);
+      
+      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+      long minlength = Long.MAX_VALUE;
+      
+      for (BlockRecord record : blockRecords) {
+        BlockRecoveryInfo info = record.info;
+        assert (info != null && info.getBlock().getGenerationStamp() >= block.getGenerationStamp());
+        if (!shouldRecoverRwrs && info.wasRecoveredOnStartup()) {
+          LOG.info("Not recovering replica " + record + " since it was recovered on "
+              + "startup and we have better replicas");
+          continue;
+        }
+        if (keepLength) {
+          if (info.getBlock().getNumBytes() == block.getNumBytes()) {
+            syncList.add(record);
+          }
+        } else {          
+          syncList.add(record);
+          if (info.getBlock().getNumBytes() < minlength) {
+            minlength = info.getBlock().getNumBytes();
+          }
+        }
+      }
+
       if (syncList.isEmpty() && errorCount > 0) {
         throw new IOException("All datanodes failed: block=" + block
             + ", datanodeids=" + Arrays.asList(datanodeids));
@@ -1816,7 +1868,7 @@ public class DataNode extends Configured
 
     for(BlockRecord r : syncList) {
       try {
-        r.datanode.updateBlock(r.block, newblock, closeFile);
+        r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile);
         successList.add(r.id);
       } catch (IOException e) {
         InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 10 00:06:41 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
@@ -573,7 +574,7 @@ public class FSDataset implements FSCons
       for (BlockAndFile b : blockSet) {
         File f = b.pathfile;  // full path name of block file
         volumeMap.put(b.block, new DatanodeBlockInfo(this, f));
-        ongoingCreates.put(b.block, new ActiveFile(f));
+        ongoingCreates.put(b.block, ActiveFile.createStartupRecoveryFile(f));
         if (DataNode.LOG.isDebugEnabled()) {
           DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block);
         }
@@ -767,19 +768,34 @@ public class FSDataset implements FSCons
     final File file;
     final List<Thread> threads = new ArrayList<Thread>(2);
     private volatile long visibleLength;
-
+    /**
+     * Set to true if this file was recovered during datanode startup.
+     * This may indicate that the file has been truncated (eg during
+     * underlying filesystem journal replay)
+     */
+    final boolean wasRecoveredOnStartup;
+    
     ActiveFile(File f, List<Thread> list) {
-      this(f);
+      this(f, false);
       if (list != null) {
         threads.addAll(list);
       }
       threads.add(Thread.currentThread());
     }
 
-    // no active threads associated with this ActiveFile
-    ActiveFile(File f) {
+    /**
+     * Create an ActiveFile from a file on disk during DataNode startup.
+     * This factory method is just to make it clear when the purpose
+     * of this constructor is.
+     */
+    public static ActiveFile createStartupRecoveryFile(File f) {
+      return new ActiveFile(f, true);
+    }
+
+    private ActiveFile(File f, boolean recovery) {
       file = f;
       visibleLength = f.length();
+      wasRecoveredOnStartup = recovery;
     }
 
     public long getVisibleLength() {
@@ -809,7 +825,7 @@ public class FSDataset implements FSCons
   }
 
   /** Find the corresponding meta data file from a given block file */
-  static File findMetaFile(final File blockFile) throws IOException {
+  public static File findMetaFile(final File blockFile) throws IOException {
     final String prefix = blockFile.getName() + "_";
     final File parent = blockFile.getParentFile();
     File[] matches = parent.listFiles(new FilenameFilter() {
@@ -1216,12 +1232,31 @@ public class FSDataset implements FSCons
           + ") to newlen (=" + newlen + ")");
     }
 
+    if (newlen == 0) {
+      // Special case for truncating to 0 length, since there's no previous
+      // chunk.
+      RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+      try {
+        //truncate blockFile 
+        blockRAF.setLength(newlen);   
+      } finally {
+        blockRAF.close();
+      }
+      //update metaFile 
+      RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+      try {
+        metaRAF.setLength(BlockMetadataHeader.getHeaderSize());
+      } finally {
+        metaRAF.close();
+      }
+      return;
+    }
     DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
     int checksumsize = dcs.getChecksumSize();
     int bpc = dcs.getBytesPerChecksum();
-    long n = (newlen - 1)/bpc + 1;
-    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
-    long lastchunkoffset = (n - 1)*bpc;
+    long newChunkCount = (newlen - 1)/bpc + 1;
+    long newmetalen = BlockMetadataHeader.getHeaderSize() + newChunkCount*checksumsize;
+    long lastchunkoffset = (newChunkCount - 1)*bpc;
     int lastchunksize = (int)(newlen - lastchunkoffset); 
     byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 
 
@@ -1908,4 +1943,32 @@ public class FSDataset implements FSCons
       return info;
     }
   }
+
+  @Override
+  public synchronized  BlockRecoveryInfo getBlockRecoveryInfo(long blockId) 
+      throws IOException {
+    Block stored = getStoredBlock(blockId);
+
+    if (stored == null) {
+      return null;
+    }
+    
+    ActiveFile activeFile = ongoingCreates.get(stored);
+    boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
+    
+    
+    BlockRecoveryInfo info = new BlockRecoveryInfo(
+        stored, isRecovery);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
+                " length " + stored.getNumBytes() +
+                " genstamp " + stored.getGenerationStamp());
+    }
+
+    // paranoia! verify that the contents of the stored block
+    // matches the block file on disk.
+    validateBlockMetadata(stored);
+
+    return info;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Sep 10 00:06:41 2011
@@ -28,6 +28,7 @@ import java.io.OutputStream;
 
 
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -306,4 +307,6 @@ public interface FSDatasetInterface exte
    * @return true if more then minimum valid volumes left in the FSDataSet
    */
   public boolean hasEnoughResource();
+
+  public BlockRecoveryInfo getBlockRecoveryInfo(long blockId) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep 10 00:06:41 2011
@@ -2113,6 +2113,8 @@ public class FSNamesystem implements FSC
    * @param lease The lease for the client creating the file
    */
   void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
+    assert Thread.holdsLock(this);
+
     LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);

Added: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java?rev=1167409&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java Sat Sep 10 00:06:41 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.Writable;
+
+public class BlockRecoveryInfo implements Writable {
+  private Block block;
+  private boolean wasRecoveredOnStartup;
+  
+  public BlockRecoveryInfo() {
+    block = new Block();
+    wasRecoveredOnStartup = false;
+  }
+  
+  public BlockRecoveryInfo(Block block,
+      boolean wasRecoveredOnStartup)
+  {
+    this.block = new Block(block);
+    this.wasRecoveredOnStartup = wasRecoveredOnStartup;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    block.readFields(in);
+    wasRecoveredOnStartup = in.readBoolean();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    block.write(out);
+    out.writeBoolean(wasRecoveredOnStartup);    
+  }
+
+  public Block getBlock() {
+    return block;
+  }
+  public boolean wasRecoveredOnStartup() {
+    return wasRecoveredOnStartup;
+  }
+  
+  public String toString() {
+    return "BlockRecoveryInfo(block=" + block +
+      " wasRecoveredOnStartup=" + wasRecoveredOnStartup + ")";
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Sat Sep 10 00:06:41 2011
@@ -46,6 +46,12 @@ public interface InterDatanodeProtocol e
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
+   * @return the BlockRecoveryInfo for a block
+   * @return null if the block is not found
+   */
+  BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException;
+  
+  /**
    * Update the block to the new generation stamp and length.  
    */
   void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Sat Sep 10 00:06:41 2011
@@ -35,16 +35,16 @@ import org.apache.commons.logging.LogFac
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
@@ -55,7 +55,6 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
-import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -227,11 +226,9 @@ public class TestFileAppend4 extends Tes
         
         int actual = b.getNames().length;
         if ( actual < expected ) {
-          if (true || iters > 0) {
-            LOG.info("Not enough replicas for " + b +
-                               " yet. Expecting " + expected + ", got " + 
-                               actual + ".");
-          }
+          LOG.info("Not enough replicas for " + b +
+              " yet. Expecting " + expected + ", got " + 
+              actual + ".");
           replOk = false;
           break;
         }
@@ -260,8 +257,20 @@ public class TestFileAppend4 extends Tes
     AppendTestUtil.check(whichfs, file1, fileSize);
   }
   
-  private void corruptDatanode(int dnNumber) throws Exception {
-    // get the FS data of the 2nd datanode
+  enum CorruptionType {
+    CORRUPT_LAST_CHUNK,
+    TRUNCATE_BLOCK_TO_ZERO,
+    TRUNCATE_BLOCK_HALF;
+  }
+  
+  /**
+   * Corrupt all of the blocks in the blocksBeingWritten dir
+   * for the specified datanode number. The corruption is
+   * specifically the last checksum chunk of the file being
+   * modified by writing random data into it.
+   */
+  private void corruptDataNode(int dnNumber, CorruptionType type) throws Exception {
+    // get the FS data of the specified datanode
     File data_dir = new File(System.getProperty("test.build.data"),
                              "dfs/data/data" + 
                              Integer.toString(dnNumber*2 + 1) + 
@@ -271,21 +280,38 @@ public class TestFileAppend4 extends Tes
       // only touch the actual data, not the metadata (with CRC)
       if (block.getName().startsWith("blk_") && 
          !block.getName().endsWith("meta")) {
-        RandomAccessFile file = new RandomAccessFile(block, "rw");
-        FileChannel channel = file.getChannel();
-
-        Random r = new Random();
-        long lastBlockSize = channel.size() % 512;
-        long position = channel.size() - lastBlockSize;
-        int length = r.nextInt((int)(channel.size() - position + 1));
-        byte[] buffer = new byte[length];
-        r.nextBytes(buffer);
-
-        channel.write(ByteBuffer.wrap(buffer), position);
-        System.out.println("Deliberately corrupting file " + block.getName() + 
-                           " at offset " + position +
-                           " length " + length);
-        file.close();
+        if (type == CorruptionType.CORRUPT_LAST_CHUNK) {
+          RandomAccessFile file = new RandomAccessFile(block, "rw");
+          FileChannel channel = file.getChannel();
+          Random r = new Random();
+          long lastBlockSize = channel.size() % 512;
+          long position = channel.size() - lastBlockSize;
+          int length = r.nextInt((int)(channel.size() - position + 1));
+          byte[] buffer = new byte[length];
+          r.nextBytes(buffer);
+
+
+          channel.write(ByteBuffer.wrap(buffer), position);
+          System.out.println("Deliberately corrupting file " + block.getName() + 
+                             " at offset " + position +
+                             " length " + length);
+          file.close();
+
+        } else if (type == CorruptionType.TRUNCATE_BLOCK_TO_ZERO) {
+          LOG.info("Truncating block file at " + block);
+          RandomAccessFile blockFile = new RandomAccessFile(block, "rw");
+          blockFile.setLength(0);
+          blockFile.close();
+          
+          RandomAccessFile metaFile = new RandomAccessFile(
+              FSDataset.findMetaFile(block), "rw");
+          metaFile.setLength(0);
+          metaFile.close();
+        } else if (type == CorruptionType.TRUNCATE_BLOCK_HALF) {
+          FSDatasetTestUtil.truncateBlockFile(block, block.length() / 2);
+        } else {
+          assert false;
+        }
         ++corrupted;
       }
     }
@@ -554,7 +580,7 @@ public class TestFileAppend4 extends Tes
       LOG.info("STOPPED first instance of the cluster");
 
       // give the second datanode a bad CRC
-      corruptDatanode(corruptDN);
+      corruptDataNode(corruptDN, CorruptionType.CORRUPT_LAST_CHUNK);
       
       // restart the cluster
       cluster = new MiniDFSCluster(conf, 3, false, null);
@@ -1052,6 +1078,115 @@ public class TestFileAppend4 extends Tes
   }
 
   /**
+   * Test for what happens when the machine doing the write totally
+   * loses power, and thus when it restarts, the local replica has been
+   * truncated to 0 bytes (very common with journaling filesystems)
+   */
+  public void testTruncatedPrimaryDN() throws Exception {
+    LOG.info("START");
+    runDNRestartCorruptType(CorruptionType.TRUNCATE_BLOCK_TO_ZERO);
+  }
+  
+  /**
+   * Test for what happens when the machine doing the write loses power
+   * but a previous length of the block being written had made it to the
+   * journal
+   */
+  public void testHalfLengthPrimaryDN() throws Exception {
+    LOG.info("START");
+    runDNRestartCorruptType(CorruptionType.TRUNCATE_BLOCK_HALF);
+  }
+  
+  private void runDNRestartCorruptType(CorruptionType corrupt) throws Exception {
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE%4 == 0);
+
+      file1 = new Path("/dnDeath.dat");
+
+      // write 1/2 block & close
+      stm = fs1.create(file1, true, 1024, rep, 4096);
+      AppendTestUtil.write(stm, 0, 1024);
+      stm.sync();
+      loseLeases(fs1);
+      
+      DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream();
+      dfso.abortForTests();
+      
+      // close the primary DN
+      DataNodeProperties badDN = cluster.stopDataNode(0);
+      
+      // Truncate the block on the primary DN
+      corruptDataNode(0, corrupt);
+
+      // Start the DN back up
+      cluster.restartDataNode(badDN);
+
+      // Recover the lease
+      FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+      recoverFile(fs2);
+      
+      assertFileSize(fs2, 1024);
+      checkFile(fs2, 1024);
+    } finally {
+      // explicitly do not shut down fs1, since it's been frozen up by
+      // killing the DataStreamer and not allowing recovery
+      cluster.shutdown();
+    }
+  }
+
+  public void testFullClusterPowerLoss() throws Exception {
+    cluster = new MiniDFSCluster(conf, 2, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      short rep = 2; // replication
+      assertTrue(BLOCK_SIZE%4 == 0);
+
+      file1 = new Path("/dnDeath.dat");
+
+      // write 1/2 block & close
+      stm = fs1.create(file1, true, 1024, rep, 4096);
+      AppendTestUtil.write(stm, 0, 1024);
+      stm.sync();
+      loseLeases(fs1);
+      
+      DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream();
+      dfso.abortForTests();
+      
+      // close the DNs
+      DataNodeProperties badDN = cluster.stopDataNode(0);
+      DataNodeProperties badDN2 = cluster.stopDataNode(0); // what was 1 is now 0
+      assertNotNull(badDN);
+      assertNotNull(badDN2);
+      
+      // Truncate one of them as if its journal got corrupted
+      corruptDataNode(0, CorruptionType.TRUNCATE_BLOCK_HALF);
+      
+      // Start the DN back up
+      cluster.restartDataNode(badDN);
+      cluster.restartDataNode(badDN2);
+      
+      // Wait for a heartbeat to make sure we get the initial block
+      // report of the replicasBeingWritten
+      cluster.waitForDNHeartbeat(0, 10000);
+      cluster.waitForDNHeartbeat(1, 10000);
+      
+      // Recover the lease
+      FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+      recoverFile(fs2);
+      
+      assertFileSize(fs2, 512);
+      checkFile(fs2, 512);
+    } finally {
+      // explicitly do not shut down fs1, since it's been frozen up by
+      // killing the DataStreamer and not allowing recovery
+      cluster.shutdown();
+    }    
+  }
+
+  /**
    * Mockito answer helper that triggers one latch as soon as the
    * method is called, then waits on another before continuing.
    */

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java Sat Sep 10 00:06:41 2011
@@ -51,12 +51,22 @@ public class TestLeaseRecovery extends j
     return m;
   }
 
+  public void testBlockSynchronization() throws Exception {
+    runTestBlockSynchronization(false);
+  }
+  public void testBlockSynchronizationWithZeroBlock() throws Exception {
+    runTestBlockSynchronization(true);
+  }
+
+
   /**
    * The following test first creates a file with a few blocks.
    * It randomly truncates the replica of the last block stored in each datanode.
    * Finally, it triggers block synchronization to synchronize all stored block.
+   * @param forceOneBlockToZero if true, will truncate one block to 0 length
    */
-  public void testBlockSynchronization() throws Exception {
+  public void runTestBlockSynchronization(boolean forceOneBlockToZero)
+  throws Exception {
     final int ORG_FILE_SIZE = 3000; 
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", BLOCK_SIZE);
@@ -101,6 +111,9 @@ public class TestLeaseRecovery extends j
       for(int i = 0; i < REPLICATION_NUM; i++) {
         newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize);
       }
+      if (forceOneBlockToZero) {
+        newblocksizes[0] = 0;
+      }
       DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); 
 
       //update blocks with random block sizes

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java Sat Sep 10 00:06:41 2011
@@ -39,9 +39,16 @@ public abstract class FSDatasetTestUtil 
       throw new IOException("Can't find block file for block " +
                             block + " on DN " + dn);
     }
-    File metaFile = ds.findMetaFile(blockFile);
+    File metaFile = FSDataset.findMetaFile(blockFile);
     FSDataset.truncateBlock(blockFile, metaFile,
                             block.getNumBytes(), newLength);
   }
+  
+  public static void truncateBlockFile(File blockFile, long newLength)
+    throws IOException {
+    File metaFile = FSDataset.findMetaFile(blockFile);
+    FSDataset.truncateBlock(blockFile, metaFile,
+                            blockFile.length(), newLength);    
+  }
 
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Sep 10 00:06:41 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -687,4 +688,11 @@ public class SimulatedFSDataset  impleme
   public Block[] getBlocksBeingWrittenReport() {
     return null;
   }
+
+  @Override
+  public BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
+      throws IOException {
+    Block stored = getStoredBlock(blockId);
+    return new BlockRecoveryInfo(stored, false);
+  }
 }