You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2009/09/26 23:23:26 UTC

svn commit: r819215 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/server/protocol/ src...

Author: shv
Date: Sat Sep 26 21:23:16 2009
New Revision: 819215

URL: http://svn.apache.org/viewvc?rev=819215&view=rev
Log:
HDFS-644. Lease recovery, concurrency support. Contributed by Konstantin Shvachko.

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java   (with props)
Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Sat Sep 26 21:23:16 2009
@@ -10,6 +10,8 @@
 
     HDFS-636. SafeMode counts complete blocks only. (shv)
 
+    HDFS-644. Lease recovery, concurrency support. (shv)
+
   NEW FEATURES
 
     HDFS-536. Support hflush at DFSClient. (hairong)

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Sat Sep 26 21:23:16 2009
@@ -42,6 +42,7 @@
    * generated access token is returned as part of the return value.
    * @throws IOException
    */
+  @Deprecated // not used anymore - should be removed
   LocatedBlock recoverBlock(Block block, boolean keepLength,
       DatanodeInfo[] targets) throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java Sat Sep 26 21:23:16 2009
@@ -15,17 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
 
 /**
  * Exception indicating that a replica is already being recovery.
  */
-class RecoveryInProgressException extends IOException {
+public class RecoveryInProgressException extends IOException {
   private static final long serialVersionUID = 1L;
 
-  RecoveryInProgressException(String msg) {
+  public RecoveryInProgressException(String msg) {
     super(msg);
   }
 }
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Sep 26 21:23:16 2009
@@ -34,6 +34,7 @@
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -69,6 +70,7 @@
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -77,6 +79,7 @@
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -913,7 +916,7 @@
       processDistributedUpgradeCommand((UpgradeCommand)cmd);
       break;
     case DatanodeProtocol.DNA_RECOVERBLOCK:
-      recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -1515,16 +1518,16 @@
     return info;
   }
 
-  public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+  public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
     Daemon d = new Daemon(threadGroup, new Runnable() {
       /** Recover a list of blocks. It is run by the primary datanode. */
       public void run() {
-        for(int i = 0; i < blocks.length; i++) {
+        for(RecoveringBlock b : blocks) {
           try {
-            logRecoverBlock("NameNode", blocks[i], targets[i]);
-            recoverBlock(blocks[i], false, targets[i], true);
+            logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+            recoverBlock(b);
           } catch (IOException e) {
-            LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
+            LOG.warn("recoverBlocks FAILED: " + b, e);
           }
         }
       }
@@ -1580,9 +1583,9 @@
   }
 
   /** Recover a block */
-  private LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeInfo[] targets, boolean closeFile) throws IOException {
-
+  private LocatedBlock recoverBlock(RecoveringBlock rBlock) throws IOException {
+    Block block = rBlock.getBlock();
+    DatanodeInfo[] targets = rBlock.getLocations();
     DatanodeID[] datanodeids = (DatanodeID[])targets;
     // If the block is already being recovered, then skip recovering it.
     // This can happen if the namenode and client start recovering the same
@@ -1609,16 +1612,9 @@
               this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
           BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
           if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
-            if (keepLength) {
-              if (info.getNumBytes() == block.getNumBytes()) {
-                syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              }
-            }
-            else {
-              syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              if (info.getNumBytes() < minlength) {
-                minlength = info.getNumBytes();
-              }
+            syncList.add(new BlockRecord(id, datanode, new Block(info)));
+            if (info.getNumBytes() < minlength) {
+              minlength = info.getNumBytes();
             }
           }
         } catch (IOException e) {
@@ -1633,10 +1629,8 @@
         throw new IOException("All datanodes failed: block=" + block
             + ", datanodeids=" + Arrays.asList(datanodeids));
       }
-      if (!keepLength) {
-        block.setNumBytes(minlength);
-      }
-      return syncBlock(block, syncList, targets, closeFile);
+      block.setNumBytes(minlength);
+      return syncBlock(rBlock, syncList);
     } finally {
       synchronized (ongoingRecovery) {
         ongoingRecovery.remove(block);
@@ -1645,20 +1639,22 @@
   }
 
   /** Block synchronization */
-  private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
-      DatanodeInfo[] targets, boolean closeFile) throws IOException {
+  private LocatedBlock syncBlock(RecoveringBlock rBlock,
+                                 List<BlockRecord> syncList) throws IOException {
+    Block block = rBlock.getBlock();
+    long newGenerationStamp = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
-          + "), syncList=" + syncList + ", closeFile=" + closeFile);
+          + "), syncList=" + syncList);
     }
 
     //syncList.isEmpty() that all datanodes do not have the block
     //so the block can be deleted.
     if (syncList.isEmpty()) {
-      namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
-          DatanodeID.EMPTY_ARRAY);
+      namenode.commitBlockSynchronization(block, newGenerationStamp, 0,
+          true, true, DatanodeID.EMPTY_ARRAY);
       //always return a new access token even if everything else stays the same
-      LocatedBlock b = new LocatedBlock(block, targets);
+      LocatedBlock b = new LocatedBlock(block, rBlock.getLocations());
       if (isAccessTokenEnabled) {
         b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
             .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
@@ -1668,12 +1664,12 @@
 
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
 
-    long generationstamp = namenode.nextGenerationStamp(block);
-    Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
+    Block newblock =
+      new Block(block.getBlockId(), block.getNumBytes(), newGenerationStamp);
 
     for(BlockRecord r : syncList) {
       try {
-        r.datanode.updateBlock(r.block, newblock, closeFile);
+        r.datanode.updateBlock(r.block, newblock, true);
         successList.add(r.id);
       } catch (IOException e) {
         InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
@@ -1685,7 +1681,7 @@
       DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
 
       namenode.commitBlockSynchronization(block,
-          newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
+          newblock.getGenerationStamp(), newblock.getNumBytes(), true, false,
           nlist);
       DatanodeInfo[] info = new DatanodeInfo[nlist.length];
       for (int i = 0; i < nlist.length; i++) {
@@ -1712,10 +1708,12 @@
   
   // ClientDataNodeProtocol implementation
   /** {@inheritDoc} */
+  @SuppressWarnings("deprecation")
   public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
       ) throws IOException {
     logRecoverBlock("Client", block, targets);
-    return recoverBlock(block, keepLength, targets, false);
+    assert false : "ClientDatanodeProtocol.recoverBlock: should never be called.";
+    return null;
   }
 
   private static void logRecoverBlock(String who,

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 26 21:23:16 2009
@@ -45,6 +45,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.metrics.util.MBeanUtil;
@@ -1414,7 +1415,9 @@
   public synchronized void finalizeBlock(Block b) throws IOException {
     ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo.getState() == ReplicaState.FINALIZED) {
-      throw new IOException("Block " + b + " is already finalized.");
+      // this is legal, when recovery happens on a file that has
+      // been opened for append but never modified
+      return;
     }
     ReplicaInfo newReplicaInfo = null;
     if (replicaInfo.getState() == ReplicaState.RUR &&

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java Sat Sep 26 21:23:16 2009
@@ -306,7 +306,7 @@
     // the block is already under construction
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
     ucBlock.setBlockUCState(s);
-    ucBlock.setLocations(targets);
+    ucBlock.setExpectedLocations(targets);
     ucBlock.setLastRecoveryTime(0);
     return ucBlock;
   }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java Sat Sep 26 21:23:16 2009
@@ -46,6 +46,13 @@
   private long lastRecoveryTime = 0;
 
   /**
+   * The new generation stamp, which this block will have
+   * after the recovery succeeds. Also used as a recovery id to identify
+   * the right recovery if any of the abandoned recoveries re-appear.
+   */
+  private long blockRecoveryId = 0;
+
+  /**
    * ReplicaUnderConstruction contains information about replicas while
    * they are under construction.
    * The GS, the length and the state of the replica is as reported by 
@@ -123,7 +130,7 @@
     assert getBlockUCState() != BlockUCState.COMPLETE :
       "BlockInfoUnderConstruction cannot be in COMPLETE state";
     this.blockUCState = state;
-    setLocations(targets);
+    setExpectedLocations(targets);
   }
 
   /**
@@ -144,7 +151,7 @@
     return new BlockInfo(this);
   }
 
-  void setLocations(DatanodeDescriptor[] targets) {
+  void setExpectedLocations(DatanodeDescriptor[] targets) {
     int numLocations = targets == null ? 0 : targets.length;
     this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
     for(int i = 0; i < numLocations; i++)
@@ -156,7 +163,7 @@
    * Create array of expected replica locations
    * (as has been assigned by chooseTargets()).
    */
-  private DatanodeDescriptor[] getExpectedLocations() {
+  DatanodeDescriptor[] getExpectedLocations() {
     int numLocations = replicas == null ? 0 : replicas.size();
     DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
     for(int i = 0; i < numLocations; i++)
@@ -164,7 +171,7 @@
     return locations;
   }
 
-  int getNumLocations() {
+  int getNumExpectedLocations() {
     return replicas == null ? 0 : replicas.size();
   }
 
@@ -181,6 +188,10 @@
     blockUCState = s;
   }
 
+  long getBlockRecoveryId() {
+    return blockRecoveryId;
+  }
+
   /**
    * Commit block's length and generation stamp as reported by the client.
    * Set block state to {@link BlockUCState#COMMITTED}.
@@ -197,9 +208,12 @@
 
   /**
    * Initialize lease recovery for this block.
-   * Find the first alive data-node starting from the previous primary.
+   * Find the first alive data-node starting from the previous primary and
+   * make it primary.
    */
-  void assignPrimaryDatanode() {
+  void initializeBlockRecovery(long recoveryId) {
+    setBlockUCState(BlockUCState.UNDER_RECOVERY);
+    blockRecoveryId = recoveryId;
     if (replicas.size() == 0) {
       NameNode.stateChangeLog.warn("BLOCK*"
         + " INodeFileUnderConstruction.initLeaseRecovery:"
@@ -212,7 +226,7 @@
       if (replicas.get(j).isAlive()) {
         primaryNodeIndex = j;
         DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); 
-        primary.addBlockToBeRecovered(this, getExpectedLocations());
+        primary.addBlockToBeRecovered(this);
         NameNode.stateChangeLog.info("BLOCK* " + this
           + " recovery started, primary=" + primary);
         return;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Sep 26 21:23:16 2009
@@ -28,6 +28,8 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -57,29 +59,36 @@
   }
 
   /** A BlockTargetPair queue. */
-  private static class BlockQueue {
-    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+  private static class BlockQueue<E> {
+    private final Queue<E> blockq = new LinkedList<E>();
 
     /** Size of the queue */
     synchronized int size() {return blockq.size();}
 
     /** Enqueue */
-    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
-      return blockq.offer(new BlockTargetPair(block, targets));
+    synchronized boolean offer(E e) { 
+      return blockq.offer(e);
     }
 
     /** Dequeue */
-    synchronized List<BlockTargetPair> poll(int numBlocks) {
+    synchronized List<E> poll(int numBlocks) {
       if (numBlocks <= 0 || blockq.isEmpty()) {
         return null;
       }
 
-      List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+      List<E> results = new ArrayList<E>();
       for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
         results.add(blockq.poll());
       }
       return results;
     }
+
+    /**
+     * Returns <tt>true</tt> if the queue contains the specified element.
+     */
+    boolean contains(E e) {
+      return blockq.contains(e);
+    }
   }
 
   private volatile BlockInfo blockList = null;
@@ -89,9 +98,10 @@
   protected boolean needKeyUpdate = false;
 
   /** A queue of blocks to be replicated by this datanode */
-  private BlockQueue replicateBlocks = new BlockQueue();
+  private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
   /** A queue of blocks to be recovered by this datanode */
-  private BlockQueue recoverBlocks = new BlockQueue();
+  private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+                                new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
   private Set<Block> invalidateBlocks = new TreeSet<Block>();
 
@@ -279,15 +289,20 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    replicateBlocks.offer(block, targets);
+    replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
 
   /**
    * Store block recovery work.
    */
-  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
-    assert(block != null && targets != null && targets.length > 0);
-    recoverBlocks.offer(block, targets);
+  void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+    if(recoverBlocks.contains(block)) {
+      // this prevents adding the same block twice to the recovery queue
+      FSNamesystem.LOG.info("Block " + block +
+                            " is already in the recovery queue.");
+      return;
+    }
+    recoverBlocks.offer(block);
   }
 
   /**
@@ -325,10 +340,16 @@
         new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
   }
 
-  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
-    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
-    return blocktargetlist == null? null:
-        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+    if(blocks == null)
+      return null;
+    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+    for(BlockInfoUnderConstruction b : blocks) {
+      brCommand.add(new RecoveringBlock(
+          b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+    }
+    return brCommand;
   }
 
   /**
@@ -444,7 +465,7 @@
                   Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
     FSNamesystem.LOG.debug("Reported block " + block
         + " on " + getName() + " size " + block.getNumBytes()
-        + "replicaState = " + rState);
+        + " replicaState = " + rState);
 
     // find block by blockId
     BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId());

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Sep 26 21:23:16 2009
@@ -528,7 +528,7 @@
                                       clientMachine, 
                                       null);
             fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.clientName, path);
+            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
           }
           break;
         } 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Sep 26 21:23:16 2009
@@ -1404,7 +1404,7 @@
       }
       INodeFile oldnode = (INodeFile) old;
       fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(cons.clientName, path); 
+      fs.leaseManager.addLease(cons.getClientName(), path); 
     }
   }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep 26 21:23:16 2009
@@ -918,40 +918,45 @@
         // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
-        Lease lease = leaseManager.getLease(holder);
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease != null) {
+        Lease lease = leaseManager.getLeaseByPath(src);
+        if (lease == null) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because current leaseholder is trying to recreate file.");
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because pendingCreates is non-null but no leases found.");
         }
         //
-        // Find the original holder.
+        // We found the lease for this file. And surprisingly the original
+        // holder is trying to recreate this file. This should never occur.
         //
-        lease = leaseManager.getLease(pendingFile.clientName);
-        if (lease == null) {
+        if (lease.getHolder().equals(holder)) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because pendingCreates is non-null but no leases found.");
-        }
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because current leaseholder is trying to recreate file.");
+        }
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
         //
+        // Current lease holder is different from the requester.
         // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery.
+        // period, then start lease recovery, otherwise fail.
         //
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          internalReleaseLease(lease, src);
-        }
-        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
-                                               " on client " + clientMachine + 
-                                               ", because this file is already being created by " +
-                                               pendingFile.getClientName() + 
-                                               " on " + pendingFile.getClientMachine());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+
+        } else
+          throw new AlreadyBeingCreatedException("failed to create file " +
+              src + " for " + holder + " on client " + clientMachine + 
+              ", because this file is already being created by " +
+              pendingFile.getClientName() + 
+              " on " + pendingFile.getClientMachine());
       }
 
       try {
@@ -1004,7 +1009,7 @@
                                         clientMachine,
                                         clientNode);
         dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.clientName, src);
+        leaseManager.addLease(cons.getClientName(), src);
 
       } else {
        // Now we can add the name to the filesystem. This file has no
@@ -1020,7 +1025,7 @@
           throw new IOException("DIR* NameSystem.startFile: " +
                                 "Unable to add file to namespace.");
         }
-        leaseManager.addLease(newNode.clientName, src);
+        leaseManager.addLease(newNode.getClientName(), src);
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                      +"add "+src+" to namespace for "+holder);
@@ -1632,20 +1637,31 @@
    * Move a file that is being written to be immutable.
    * @param src The filename
    * @param lease The lease for the client creating the file
-   */
-  void internalReleaseLease(Lease lease, String src) throws IOException {
+   * @param recoveryLeaseHolder reassign lease to this holder if the last block
+   *        needs recovery; keep current holder if null.
+   * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+   *         replication;<br>
+   *         RecoveryInProgressException if lease recovery is in progress.<br>
+   *         IOException in case of an error.
+   * @return true  if file has been successfully finalized and closed or 
+   *         false if block recovery has been initiated
+   */
+  boolean internalReleaseLease(
+      Lease lease, String src, String recoveryLeaseHolder)
+  throws AlreadyBeingCreatedException,
+         IOException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " file does not exist.";
       NameNode.stateChangeLog.warn(message);
       throw new IOException(message);
     }
     if (!iFile.isUnderConstruction()) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " but file is already closed.";
       NameNode.stateChangeLog.warn(message);
@@ -1653,35 +1669,112 @@
     }
 
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
-    BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+    int nrBlocks = pendingFile.numBlocks();
+    BlockInfo[] blocks = pendingFile.getBlocks();
 
-    // Initialize lease recovery for pendingFile. If there are no blocks 
-    // associated with this file, then reap lease immediately. Otherwise 
-    // renew the lease and trigger lease recovery.
-    if (lastBlock == null) {
-      assert pendingFile.getBlocks().length == 0 :
-        "file is not empty but the last block does not exist";
+    int nrCompleteBlocks;
+    BlockInfo curBlock = null;
+    for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+      curBlock = blocks[nrCompleteBlocks];
+      if(!curBlock.isComplete())
+        break;
+      assert blockManager.checkMinReplication(curBlock) :
+              "A COMPLETE block is not minimally replicated in " + src;
+    }
+
+    // If there are no incomplete blocks associated with this file,
+    // then reap lease immediately and close the file.
+    if(nrCompleteBlocks == nrBlocks) {
       finalizeINodeFileUnderConstruction(src, pendingFile);
       NameNode.stateChangeLog.warn("BLOCK*"
-        + " internalReleaseLease: No blocks found, lease removed.");
-      return;
+        + " internalReleaseLease: All existing blocks are COMPLETE,"
+        + " lease removed, file closed.");
+      return true;  // closed!
     }
 
-    // setup the last block locations from the blockManager if not known
-    if(lastBlock.getNumLocations() == 0) {
-      DatanodeDescriptor targets[] = blockManager.getNodes(lastBlock);
-      lastBlock.setLocations(targets);
+    // Only the last and the penultimate blocks may be in non COMPLETE state.
+    // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+    if(nrCompleteBlocks < nrBlocks - 2 ||
+       nrCompleteBlocks == nrBlocks - 2 &&
+         curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+      final String message = "DIR* NameSystem.internalReleaseLease: "
+        + "attempt to release a create lock on "
+        + src + " but file is already closed.";
+      NameNode.stateChangeLog.warn(message);
+      throw new IOException(message);
     }
 
-    // start lease recovery of the last block for this file.
-    lastBlock.assignPrimaryDatanode();
-    leaseManager.renewLease(lease);
+    // no we know that the last block is not COMPLETE, and
+    // that the penultimate block if exists is either COMPLETE or COMMITTED
+    BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+    BlockUCState lastBlockState = lastBlock.getBlockUCState();
+    BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+    BlockUCState penultimateBlockState = (penultimateBlock == null ?
+        BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+    assert penultimateBlockState == BlockUCState.COMPLETE ||
+           penultimateBlockState == BlockUCState.COMMITTED :
+           "Unexpected state of penultimate block in " + src;
+
+    switch(lastBlockState) {
+    case COMPLETE:
+      assert false : "Already checked that the last block is incomplete";
+      break;
+    case COMMITTED:
+      // Close file if committed blocks are minimally replicated
+      if(blockManager.checkMinReplication(penultimateBlock) &&
+          blockManager.checkMinReplication(lastBlock)) {
+        finalizeINodeFileUnderConstruction(src, pendingFile);
+        NameNode.stateChangeLog.warn("BLOCK*"
+          + " internalReleaseLease: Committed blocks are minimally replicated,"
+          + " lease removed, file closed.");
+        return true;  // closed!
+      }
+      // Cannot close file right now, since some blocks 
+      // are not yet minimally replicated.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      String message = "DIR* NameSystem.internalReleaseLease: " +
+          "Failed to release lease for file " + src +
+          ". Committed blocks are waiting to be minimally replicated." +
+          " Try again later.";
+      NameNode.stateChangeLog.warn(message);
+      throw new AlreadyBeingCreatedException(message);
+    case UNDER_CONSTRUCTION:
+    case UNDER_RECOVERY:
+      // setup the last block locations from the blockManager if not known
+      if(lastBlock.getNumExpectedLocations() == 0)
+        lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+      // start recovery of the last block for this file
+      long blockRecoveryId = nextGenerationStamp();
+      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+      lastBlock.initializeBlockRecovery(blockRecoveryId);
+      leaseManager.renewLease(lease);
+      // Cannot close file right now, since the last block requires recovery.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      NameNode.stateChangeLog.warn(
+                "DIR* NameSystem.internalReleaseLease: " +
+                "File " + src + " has not been closed." +
+               " Lease recovery is in progress. " +
+                "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+      break;
+    }
+    return false;
   }
 
+  Lease reassignLease(Lease lease, String src, String newHolder,
+                      INodeFileUnderConstruction pendingFile) {
+    if(newHolder == null)
+      return lease;
+    pendingFile.setClientName(newHolder);
+    return leaseManager.reassignLease(lease, src, newHolder);
+  }
+
+
   private void finalizeINodeFileUnderConstruction(
       String src,
       INodeFileUnderConstruction pendingFile) throws IOException {
-    leaseManager.removeLease(pendingFile.clientName, src);
+    leaseManager.removeLease(pendingFile.getClientName(), src);
 
     // complete the penultimate block
     blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
@@ -1715,11 +1808,20 @@
       throw new IOException("Block (=" + lastblock + ") not found");
     }
     INodeFile iFile = oldblockinfo.getINode();
-    if (!iFile.isUnderConstruction()) {
+    if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
       throw new IOException("Unexpected block (=" + lastblock
           + ") since the file (=" + iFile.getLocalName()
           + ") is not under construction");
     }
+
+    long recoveryId =
+      ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
+    if(recoveryId != newgenerationstamp) {
+      throw new IOException("The recovery id " + newgenerationstamp
+          + " does not match current recovery id "
+          + recoveryId + " for block " + lastblock); 
+    }
+        
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Sat Sep 26 21:23:16 2009
@@ -25,7 +25,7 @@
 
 
 class INodeFileUnderConstruction extends INodeFile {
-  final String clientName;         // lease holder
+  private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
   
@@ -64,6 +64,10 @@
     return clientName;
   }
 
+  void setClientName(String clientName) {
+    this.clientName = clientName;
+  }
+
   String getClientMachine() {
     return clientMachine;
   }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Sat Sep 26 21:23:16 2009
@@ -102,7 +102,7 @@
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String holder, String src) {
+  synchronized Lease addLease(String holder, String src) {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
@@ -113,6 +113,7 @@
     }
     sortedLeasesByPath.put(src, lease);
     lease.paths.add(src);
+    return lease;
   }
 
   /**
@@ -143,11 +144,22 @@
   }
 
   /**
+   * Reassign lease for file src to the new holder.
+   */
+  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+    assert newHolder != null : "new lease holder is null";
+    if (lease != null) {
+      removeLease(lease, src);
+    }
+    return addLease(newHolder, src);
+  }
+
+  /**
    * Finds the pathname for the specified pendingFile
    */
   synchronized String findPath(INodeFileUnderConstruction pendingFile
       ) throws IOException {
-    Lease lease = getLease(pendingFile.clientName);
+    Lease lease = getLease(pendingFile.getClientName());
     if (lease != null) {
       String src = lease.findPath(pendingFile);
       if (src != null) {
@@ -265,7 +277,11 @@
     Collection<String> getPaths() {
       return paths;
     }
-    
+
+    String getHolder() {
+      return holder;
+    }
+
     void replacePath(String oldpath, String newpath) {
       paths.remove(oldpath);
       paths.add(newpath);
@@ -376,7 +392,13 @@
       oldest.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
         try {
-          fsnamesystem.internalReleaseLease(oldest, p);
+          if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+            LOG.info("Lease recovery for file " + p +
+                          " is complete. File closed.");
+            removing.add(p);
+          } else
+            LOG.info("Started block recovery for file " + p +
+                          " lease " + oldest);
         } catch (IOException e) {
           LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
           removing.add(p);

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java?rev=819215&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java Sat Sep 26 21:23:16 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * BlockRecoveryCommand is an instruction to a data-node to recover
+ * the specified blocks.
+ *
+ * The data-node that receives this command treats itself as a primary
+ * data-node in the recover process.
+ *
+ * Block recovery is identified by a recoveryId, which is also the new
+ * generation stamp, which the block will have after the recovery succeeds.
+ */
+public class BlockRecoveryCommand extends DatanodeCommand {
+  Collection<RecoveringBlock> recoveringBlocks;
+
+  /**
+   * This is a block with locations from which it should be recovered
+   * and the new generation stamp, which the block will have after 
+   * successful recovery.
+   * 
+   * The new generation stamp of the block, also plays role of the recovery id.
+   */
+  public static class RecoveringBlock extends LocatedBlock {
+    private long newGenerationStamp;
+
+    /**
+     * Create empty RecoveringBlock.
+     */
+    public RecoveringBlock() {
+      super();
+      newGenerationStamp = -1L;
+    }
+
+    /**
+     * Create RecoveringBlock.
+     */
+    public RecoveringBlock(Block b, DatanodeInfo[] locs, long newGS) {
+      super(b, locs, -1, false); // startOffset is unknown
+      this.newGenerationStamp = newGS;
+    }
+
+    /**
+     * Return the new generation stamp of the block,
+     * which also plays role of the recovery id.
+     */
+    public long getNewGenerationStamp() {
+      return newGenerationStamp;
+    }
+
+    ///////////////////////////////////////////
+    // Writable
+    ///////////////////////////////////////////
+    static {                                      // register a ctor
+      WritableFactories.setFactory
+        (RecoveringBlock.class,
+         new WritableFactory() {
+           public Writable newInstance() { return new RecoveringBlock(); }
+         });
+    }
+
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      out.writeLong(newGenerationStamp);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      newGenerationStamp = in.readLong();
+    }
+  }
+
+  /**
+   * Create empty BlockRecoveryCommand.
+   */
+  public BlockRecoveryCommand() {
+    this(0);
+  }
+
+  /**
+   * Create BlockRecoveryCommand with
+   * the specified capacity for recovering blocks.
+   */
+  public BlockRecoveryCommand(int capacity) {
+    super(DatanodeProtocol.DNA_RECOVERBLOCK);
+    recoveringBlocks = new ArrayList<RecoveringBlock>(capacity);
+  }
+
+  /**
+   * Return the list of recovering blocks.
+   */
+  public Collection<RecoveringBlock> getRecoveringBlocks() {
+    return recoveringBlocks;
+  }
+
+  /**
+   * Add recovering block to the command.
+   */
+  public void add(RecoveringBlock block) {
+    recoveringBlocks.add(block);
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (BlockRecoveryCommand.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new BlockRecoveryCommand(); }
+       });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(recoveringBlocks.size());
+    for(RecoveringBlock block : recoveringBlocks) {
+      block.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int numBlocks = in.readInt();
+    recoveringBlocks = new ArrayList<RecoveringBlock>(numBlocks);
+    for(int i = 0; i < numBlocks; i++) {
+      RecoveringBlock b = new RecoveringBlock();
+      b.readFields(in);
+      add(b);
+    }
+  }
+}

Propchange: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sat Sep 26 21:23:16 2009
@@ -35,9 +35,9 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 21: blockReport() includes under-construction replicas.
+   * 22: BlockRecoveryCommand introduced in reply to sendHeartbeat().
    */
-  public static final long versionID = 21L;
+  public static final long versionID = 22L;
   
   // error code
   final static int NOTIFY = 0;

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Sat Sep 26 21:23:16 2009
@@ -18,12 +18,10 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -34,6 +32,7 @@
 public class TestLeaseRecovery extends junit.framework.TestCase {
   static final int BLOCK_SIZE = 1024;
   static final short REPLICATION_NUM = (short)3;
+  private static final long LEASE_PERIOD = 300L;
 
   static void checkMetaInfo(Block b, InterDatanodeProtocol idp
       ) throws IOException {
@@ -50,6 +49,15 @@
     return m;
   }
 
+  void waitLeaseRecovery(MiniDFSCluster cluster) {
+    cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
+    // wait for the lease to expire
+    try {
+      Thread.sleep(2 * 3000);  // 2 heartbeat intervals
+    } catch (InterruptedException e) {
+    }
+  }
+
   /**
    * The following test first creates a file with a few blocks.
    * It randomly truncates the replica of the last block stored in each datanode.
@@ -96,44 +104,22 @@
         checkMetaInfo(lastblock, idps[i]);
       }
 
-      //setup random block sizes 
-      int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE;
-      Integer[] newblocksizes = new Integer[REPLICATION_NUM];
-      for(int i = 0; i < REPLICATION_NUM; i++) {
-        newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize);
-      }
-      DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); 
 
       DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
       cluster.getNameNode().append(filestr, dfs.dfs.clientName);
 
-      //update blocks with random block sizes
-      long newGS = cluster.getNameNode().nextGenerationStamp(lastblock);
-      Block[] newblocks = new Block[REPLICATION_NUM];
-      for(int i = 0; i < REPLICATION_NUM; i++) {
-        newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
-            newGS);
-        idps[i].updateBlock(lastblock, newblocks[i], false);
-        checkMetaInfo(newblocks[i], idps[i]);
-      }
-      cluster.getNameNode().commitBlockSynchronization(lastblock, newGS, 
-          lastblocksize, false, false, new DatanodeID[]{});
-
-      //block synchronization
-      final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length);
-      DataNode.LOG.info("primarydatanodeindex  =" + primarydatanodeindex);
-      DataNode primary = datanodes[primarydatanodeindex];
-      DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration);
-      primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join();
+      // expire lease to trigger block recovery.
+      waitLeaseRecovery(cluster);
 
       BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM];
-      int minsize = min(newblocksizes);
-      long currentGS = cluster.getNamesystem().getGenerationStamp();
-      lastblock.setGenerationStamp(currentGS);
+      long oldSize = lastblock.getNumBytes();
+      lastblock = TestInterDatanodeProtocol.getLastLocatedBlock(
+          dfs.dfs.getNamenode(), filestr).getBlock();
+      long currentGS = lastblock.getGenerationStamp();
       for(int i = 0; i < REPLICATION_NUM; i++) {
         updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
         assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
-        assertEquals(minsize, updatedmetainfo[i].getNumBytes());
+        assertEquals(oldSize, updatedmetainfo[i].getNumBytes());
         assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
       }
     }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Sat Sep 26 21:23:16 2009
@@ -56,6 +56,7 @@
   //  conf.setInt("io.bytes.per.checksum", 16);
 
     MiniDFSCluster cluster = null;
+    DistributedFileSystem dfs = null;
     byte[] actual = new byte[FILE_SIZE];
 
     try {
@@ -63,7 +64,7 @@
       cluster.waitActive();
 
       //create a file
-      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
       // create a random file name
       String filestr = "/foo" + AppendTestUtil.nextInt();
       System.out.println("filestr=" + filestr);
@@ -129,10 +130,9 @@
           + "Validating its contents now...");
 
       // verify that file-size matches
+      long fileSize = dfs.getFileStatus(filepath).getLen();
       assertTrue("File should be " + size + " bytes, but is actually " +
-                 " found to be " + dfs.getFileStatus(filepath).getLen() +
-                 " bytes",
-                 dfs.getFileStatus(filepath).getLen() == size);
+                 " found to be " + fileSize + " bytes", fileSize == size);
 
       // verify that there is enough data to read.
       System.out.println("File size is good. Now validating sizes from datanodes...");
@@ -142,6 +142,7 @@
     }
     finally {
       try {
+        if(dfs != null) dfs.close();
         if (cluster != null) {cluster.shutdown();}
       } catch (Exception e) {
         // ignore

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Sat Sep 26 21:23:16 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery.Info;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;