You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/06/02 20:43:02 UTC

svn commit: r662513 [1/2] - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Mon Jun  2 11:43:01 2008
New Revision: 662513

URL: http://svn.apache.org/viewvc?rev=662513&view=rev
Log:
HADOOP-3310. The namenode instructs the primary datanode to do lease
recovery. The block gets a new  generation stamp.
(Tsz Wo (Nicholas), SZE via dhruba)


Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreationNamenodeRestart.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun  2 11:43:01 2008
@@ -182,6 +182,10 @@
     HADOOP-3434. Retain the cause of the bind failure in Server::bind.
     (Steve Loughran via cdouglas)
 
+    HADOOP-3310. The namenode instructs the primary datanode to do lease
+    recovery. The block gets a new  generation stamp.
+    (Tsz Wo (Nicholas), SZE via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java Mon Jun  2 11:43:01 2008
@@ -25,7 +25,7 @@
  * long.
  *
  **************************************************/
-class Block implements Writable, Comparable {
+class Block implements Writable, Comparable<Block> {
 
   static {                                      // register a ctor
     WritableFactories.setFactory
@@ -51,43 +51,29 @@
     }
   }
 
+  static long filename2id(String name) {
+    return Long.parseLong(name.substring("blk_".length()));
+  }
+
   long blkid;
   long len;
   long generationStamp;
 
-  /**
-   */
-  public Block() {
-    this.blkid = 0;
-    this.len = 0;
-    this.generationStamp = 0;
-  }
+  Block() {this(0, 0, 0);}
 
-  /**
-   */
-  public Block(final long blkid, final long len, final long generationStamp) {
-    this.blkid = blkid;
-    this.len = len;
-    this.generationStamp = generationStamp;
+  Block(final long blkid, final long len, final long generationStamp) {
+    set(blkid, len, generationStamp);
   }
 
-  /**
-   */
-  public Block(Block blk) {
-    this.blkid = blk.blkid;
-    this.len = blk.len;
-    this.generationStamp = blk.generationStamp;
-  }
+  Block(final long blkid) {this(blkid, 0, GenerationStamp.WILDCARD_STAMP);}
+
+  Block(Block blk) {this(blk.blkid, blk.len, blk.generationStamp);}
 
   /**
    * Find the blockid from the given filename
    */
   public Block(File f, long len, long genstamp) {
-    String name = f.getName();
-    name = name.substring("blk_".length());
-    this.blkid = Long.parseLong(name);
-    this.len = len;
-    this.generationStamp = genstamp;
+    this(filename2id(f.getName()), len, genstamp);
   }
 
   public void set(long blkid, long len, long genStamp) {
@@ -147,31 +133,43 @@
   /////////////////////////////////////
   // Comparable
   /////////////////////////////////////
-  public int compareTo(Object o) {
-    Block b = (Block) o;
+  static void validateGenerationStamp(long generationstamp) {
+    if (generationstamp == GenerationStamp.WILDCARD_STAMP) {
+      throw new IllegalStateException("generationStamp (=" + generationstamp
+          + ") == GenerationStamp.WILDCARD_STAMP");
+    }    
+  }
+
+  /** {@inheritDoc} */
+  public int compareTo(Block b) {
+    //Wildcard generationStamp is NOT ALLOWED here
+    validateGenerationStamp(this.generationStamp);
+    validateGenerationStamp(b.generationStamp);
+
     if (blkid < b.blkid) {
       return -1;
     } else if (blkid == b.blkid) {
-      if (generationStamp < b.generationStamp) {
-        return -1;
-      } else if (generationStamp == b.generationStamp) {
-        return 0;
-      } else {
-        return 1;
-      }
+      return GenerationStamp.compare(generationStamp, b.generationStamp);
     } else {
       return 1;
     }
   }
+
+  /** {@inheritDoc} */
   public boolean equals(Object o) {
     if (!(o instanceof Block)) {
       return false;
     }
-    return blkid == ((Block)o).blkid &&
-           generationStamp == ((Block)o).generationStamp;
+    final Block that = (Block)o;
+    //Wildcard generationStamp is ALLOWED here
+    return this.blkid == that.blkid
+      && GenerationStamp.equalsWithWildcard(
+          this.generationStamp, that.generationStamp);
   }
-    
+
+  /** {@inheritDoc} */
   public int hashCode() {
+    //GenerationStamp is IRRELEVANT and should not be used here
     return 37 * 17 + (int) (blkid^(blkid>>>32));
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Mon Jun  2 11:43:01 2008
@@ -18,6 +18,9 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import java.util.List;
+
+import org.apache.hadoop.dfs.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.io.*;
 
 abstract class DatanodeCommand implements Writable {
@@ -101,12 +104,17 @@
   /**
    * Create BlockCommand for transferring blocks to another datanode
    * @param blocks    blocks to be transferred 
-   * @param targets   nodes to transfer
    */
-  BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-    super( DatanodeProtocol.DNA_TRANSFER);
-    this.blocks = blocks;
-    this.targets = targets;
+  BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
+    super(action);
+
+    blocks = new Block[blocktargetlist.size()]; 
+    targets = new DatanodeInfo[blocks.length][];
+    for(int i = 0; i < blocks.length; i++) {
+      BlockTargetPair p = blocktargetlist.get(i);
+      blocks[i] = p.block;
+      targets[i] = p.targets;
+    }
   }
 
   private static final DatanodeInfo[][] EMPTY_TARGET = {};

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java Mon Jun  2 11:43:01 2008
@@ -32,25 +32,16 @@
     WritableFactories.setFactory(BlockMetaDataInfo.class, FACTORY);
   }
 
-  /** get BlockMetaDataInfo from the data set and the block scanner */
-  static BlockMetaDataInfo getBlockMetaDataInfo(Block b,
-      FSDatasetInterface dataset, DataBlockScanner blockscanner
-      ) throws IOException {
-    BlockMetaDataInfo info = new BlockMetaDataInfo();
-    info.blkid = b.getBlockId();
-    info.lastScanTime = blockscanner.getLastScanTime(b);
-    info.len = dataset.getLength(b);
-    //TODO: get generation stamp here
-    return info;
-  }
-
   private long lastScanTime;
 
   public BlockMetaDataInfo() {}
 
-  long getLastScanTime() {return lastScanTime;}
+  BlockMetaDataInfo(Block b, long lastScanTime) {
+    super(b);
+    this.lastScanTime = lastScanTime;
+  }
 
-  long getGenerationStamp() {return generationStamp;}
+  long getLastScanTime() {return lastScanTime;}
 
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java Mon Jun  2 11:43:01 2008
@@ -138,6 +138,23 @@
       return 0;
     }
 
+    /** Update this object */
+    void update(long newgenerationstamp, long newlength,
+        DatanodeDescriptor[] newtargets) {
+      //remove all nodes  
+      for(int n = numNodes(); n >= 0; ) {
+        removeNode(--n);
+      }
+
+      //add all targets  
+      for(DatanodeDescriptor d : newtargets) {
+        addNode(d);
+      }
+
+      generationStamp = newgenerationstamp;
+      len = newlength;
+    }
+
     /**
      * Add data-node this block belongs to.
      */
@@ -156,7 +173,11 @@
      * Remove data-node from the block.
      */
     boolean removeNode(DatanodeDescriptor node) {
-      int dnIndex = this.findDatanode(node);
+      return removeNode(findDatanode(node));
+    }
+
+    /** Remove the indexed datanode from the block. */
+    boolean removeNode(int dnIndex) {
       if(dnIndex < 0) // the node is not found
         return false;
       assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java?rev=662513&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java Mon Jun  2 11:43:01 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/** An client-datanode protocol for block recovery
+ */
+interface ClientDatanodeProtocol extends VersionedProtocol {
+  public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
+
+  /**
+   * 1: start of time
+   */
+  public static final long versionID = 1L;
+
+  /** Start generation-stamp recovery for specified block
+   * @param block the specified block
+   * @param DatanodeInfo the list of possible locations of specified block
+   * @return the new blockid if recovery successful and the generation stamp
+   * got updated as part of the recovery, else returns null if the block id
+   * not have any data and the block was deleted.
+   * @throws IOException
+   */
+  Block recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Jun  2 11:43:01 2008
@@ -128,6 +128,18 @@
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
         rpcNamenode, methodNameToPolicyMap);
   }
+
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
+      DatanodeID datanodeid, Configuration conf) throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+      datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+    }
+    return (ClientDatanodeProtocol)RPC.waitForProxy(ClientDatanodeProtocol.class
+,
+        ClientDatanodeProtocol.versionID, addr, conf);
+  }
         
   /** 
    * Create a new DFSClient connected to the default namenode.
@@ -1715,6 +1727,8 @@
     private long artificialSlowdown = 0;
     private long lastFlushOffset = -1; // offset when flush was invoked
     private boolean persistBlocks = false; // persist blocks on namenode
+    private int recoveryErrorCount = 0; // number of times block recovery failed
+    private int maxRecoveryErrorCount = 5; // try block recovery 5 times
 
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -1839,15 +1853,16 @@
           synchronized (dataQueue) {
 
             // process IO errors if any
-            processDatanodeError();
+            boolean doSleep = processDatanodeError();
 
             // wait for a packet to be sent.
-            while (!closed && !hasError && clientRunning 
-                   && dataQueue.size() == 0) {
+            while ((!closed && !hasError && clientRunning 
+                   && dataQueue.size() == 0) || doSleep) {
               try {
                 dataQueue.wait(1000);
               } catch (InterruptedException  e) {
               }
+              doSleep = false;
             }
             if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
               continue;
@@ -2053,16 +2068,17 @@
     }
 
     // If this stream has encountered any errors so far, shutdown 
-    // threads and mark stream as closed.
+    // threads and mark stream as closed. Returns true if we should
+    // sleep for a while after returning from this call.
     //
-    private void processDatanodeError() {
+    private boolean processDatanodeError() {
       if (!hasError) {
-        return;
+        return false;
       }
       if (response != null) {
         LOG.info("Error Recovery for block " + block +
                  " waiting for responder to exit. ");
-        return;
+        return true;
       }
       String msg = "Error Recovery for block " + block +
                    " bad datanode[" + errorIndex + "]";
@@ -2094,7 +2110,7 @@
                                           "Aborting...");
           closed = true;
           streamer.close();
-          return;
+          return false;
         }
         StringBuilder pipelineMsg = new StringBuilder();
         for (int j = 0; j < nodes.length; j++) {
@@ -2109,7 +2125,7 @@
                                           pipeline + " are bad. Aborting...");
           closed = true;
           streamer.close();
-          return;
+          return false;
         }
         LOG.warn("Error Recovery for block " + block +
                  " in pipeline " + pipeline + 
@@ -2124,9 +2140,47 @@
         for (int i = errorIndex; i < (nodes.length-1); i++) {
           newnodes[i] = nodes[i+1];
         }
-        nodes = newnodes;
+
+        // Tell the primary datanode to do error recovery 
+        // by stamping appropriate generation stamps.
+        //
+        Block newBlock = null;
+        ClientDatanodeProtocol datanodeRPC =  null;
+        try {
+          datanodeRPC = createClientDatanodeProtocolProxy(newnodes[0], conf);
+          newBlock = datanodeRPC.recoverBlock(block, newnodes);
+        } catch (IOException e) {
+          recoveryErrorCount++;
+          if (recoveryErrorCount > maxRecoveryErrorCount) {
+            String emsg = "Error Recovery for block " + block + " failed " +
+                          " because recovery from primary datanode " +
+                          newnodes[0] + " failed " + recoveryErrorCount + 
+                          " times. Aborting...";
+            LOG.warn(emsg);
+            lastException = new IOException(emsg);
+            closed = true;
+            streamer.close();
+            return false;       // abort with IOexception
+          } 
+          LOG.warn("Error Recovery for block " + block + " failed " +
+                   " because recovery from primary datanode " +
+                   newnodes[0] + " failed " + recoveryErrorCount +
+                   " times. Will retry...");
+          return true;          // sleep when we return from here
+        } finally {
+          RPC.stopProxy(datanodeRPC);
+        }
+        recoveryErrorCount = 0; // block recovery successful
+
+        // If the block recovery generated a new generation stamp, use that
+        // from now on.
+        //
+        if (newBlock != null) {
+          block = newBlock;
+        }
 
         // setup new pipeline
+        nodes = newnodes;
         hasError = false;
         errorIndex = 0;
         success = createBlockOutputStream(nodes, src, true);
@@ -2134,6 +2188,7 @@
 
       response = new ResponseProcessor(nodes);
       response.start();
+      return false; // do not sleep, continue processing
     }
 
     private void isClosed() throws IOException {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Jun  2 11:43:01 2008
@@ -19,7 +19,6 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -80,7 +79,8 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
+public class DataNode extends Configured 
+    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
@@ -178,7 +178,7 @@
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
-      
+    super(conf);
     datanodeObject = this;
 
     try {
@@ -772,12 +772,11 @@
   private boolean processCommand(DatanodeCommand cmd) throws IOException {
     if (cmd == null)
       return true;
+    final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
-      //
       // Send a copy of a block to another datanode
-      //
-      BlockCommand bcmd = (BlockCommand)cmd;
       transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
@@ -785,7 +784,7 @@
       // Some local block(s) are obsolete and can be 
       // safely garbage-collected.
       //
-      Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+      Block toDelete[] = bcmd.getBlocks();
       try {
         if (blockScanner != null) {
           blockScanner.deleteBlocks(toDelete);
@@ -821,6 +820,9 @@
         scheduleBlockReport(initialBlockReportDelay);
       }
       break;
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+      recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -3046,16 +3048,27 @@
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block);
     }
-    return BlockMetaDataInfo.getBlockMetaDataInfo(block, data, blockScanner);
+    Block stored = data.getStoredBlock(block.blkid);
+    return stored == null?
+        null: new BlockMetaDataInfo(stored, blockScanner.getLastScanTime(stored));
+  }
+
+  Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+    Daemon d = new Daemon(threadGroup, new Runnable() {
+      public void run() {
+        LeaseManager.recoverBlocks(blocks, targets, namenode, getConf());
+      }
+    });
+    d.start();
+    return d;
   }
 
   /** {@inheritDoc} */
-  public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+      LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
     }
-    //TODO: update generation stamp here
-    return false;
+    data.updateBlock(oldblock, newblock);
   }
 
   /** {@inheritDoc} */
@@ -3063,8 +3076,21 @@
       ) throws IOException {
     if (protocol.equals(InterDatanodeProtocol.class.getName())) {
       return InterDatanodeProtocol.versionID; 
-    } else {
-      throw new IOException("Unknown protocol to name node: " + protocol);
+    } else if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
+      return ClientDatanodeProtocol.versionID; 
+    }
+    throw new IOException("Unknown protocol to " + getClass().getSimpleName()
+        + ": " + protocol);
+  }
+
+  // ClientDataNodeProtocol implementation
+  /** {@inheritDoc} */
+  public Block recoverBlock(Block block, DatanodeInfo[] targets
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("recoverBlock for block " + block);
     }
+    return LeaseManager.recoverBlock(block, targets, namenode, 
+                                     getConf(), false);
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java Mon Jun  2 11:43:01 2008
@@ -124,4 +124,9 @@
     setDetached();
     return true;
   }
+  
+  public String toString() {
+    return getClass().getSimpleName() + "(volume=" + volume
+        + ", file=" + file + ", detached=" + detached + ")";
+  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Mon Jun  2 11:43:01 2008
@@ -18,12 +18,10 @@
 package org.apache.hadoop.dfs;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
 
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
-import org.apache.hadoop.dfs.DatanodeInfo.AdminStates;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.WritableUtils;
@@ -40,28 +38,63 @@
 
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
+  /** Block and targets pair */
+  static class BlockTargetPair {
+    final Block block;
+    final DatanodeDescriptor[] targets;    
+
+    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+      this.block = block;
+      this.targets = targets;
+    }
+  }
+
+  /** A BlockTargetPair queue. */
+  private static class BlockQueue {
+    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+
+    /** Size of the queue */
+    synchronized int size() {return blockq.size();}
+
+    /** Enqueue */
+    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
+      return blockq.offer(new BlockTargetPair(block, targets));
+    }
+
+    /** Dequeue */
+    synchronized List<BlockTargetPair> poll(int numTargets) {
+      if (numTargets <= 0 || blockq.isEmpty()) {
+        return null;
+      }
+      else {
+        List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+        for(; !blockq.isEmpty() && numTargets > 0; ) {
+          numTargets -= blockq.peek().targets.length; 
+          if (numTargets >= 0) {
+            results.add(blockq.poll());
+          }
+        }
+        return results;
+      }
+    }
+  }
 
   private volatile BlockInfo blockList = null;
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
 
-  //
-  // List of blocks to be replicated by this datanode
-  // Also, a list of datanodes per block to indicate the target
-  // datanode of this replication.
-  //
-  List<Block> replicateBlocks;
-  List<DatanodeDescriptor[]> replicateTargetSets;
-  Set<Block> invalidateBlocks;
-  boolean processedBlockReport = false;
+  /** A queue of blocks to be replicated by this datanode */
+  private BlockQueue replicateBlocks = new BlockQueue();
+  /** A queue of blocks to be recovered by this datanode */
+  private BlockQueue recoverBlocks = new BlockQueue();
+  /** A set of blocks to be invalidated by this datanode */
+  private Set<Block> invalidateBlocks = new TreeSet<Block>();
 
+  boolean processedBlockReport = false;
   
   /** Default constructor */
-  public DatanodeDescriptor() {
-    super();
-    initWorkLists();
-  }
+  public DatanodeDescriptor() {}
   
   /** DatanodeDescriptor constructor
    * @param nodeID id of the data node
@@ -107,7 +140,6 @@
                             int xceiverCount) {
     super(nodeID);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
   /** DatanodeDescriptor constructor
@@ -128,16 +160,6 @@
                             int xceiverCount) {
     super(nodeID, networkLocation, hostName);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
-  }
-
-  /*
-   * initialize list of blocks that store work for the datanodes
-   */
-  private void initWorkLists() {
-    replicateBlocks = new ArrayList<Block>();
-    replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
-    invalidateBlocks = new TreeSet<Block>();
   }
 
   /**
@@ -227,10 +249,15 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    synchronized (replicateBlocks) {
-      replicateBlocks.add(block);
-      replicateTargetSets.add(targets);
-    }
+    replicateBlocks.offer(block, targets);
+  }
+
+  /**
+   * Store block recovery work.
+   */
+  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
+    assert(block != null && targets != null && targets.length > 0);
+    recoverBlocks.offer(block, targets);
   }
 
   /**
@@ -245,16 +272,14 @@
     }
   }
 
-  /*
+  /**
    * The number of work items that are pending to be replicated
    */
   int getNumberOfBlocksToBeReplicated() {
-    synchronized (replicateBlocks) {
-      return replicateBlocks.size();
-    }
+    return replicateBlocks.size();
   }
 
-  /*
+  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -279,36 +304,16 @@
     return processedBlockReport;
   }
 
-  /**
-   * Remove the specified number of target sets
-   */
-  BlockCommand getReplicationCommand(int maxNumTransfers) {
-    synchronized (replicateBlocks) {
-      assert(replicateBlocks.size() == replicateTargetSets.size());
+  BlockCommand getReplicationCommand(int maxTransfers) {
+    List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
+    return blocktargetlist == null? null:
+        new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
+  }
 
-      if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
-        return null;
-      }
-      int numTransfers = 0;
-      int numBlocks = 0;
-      int i;
-      for (i = 0; i < replicateTargetSets.size() && 
-             numTransfers < maxNumTransfers; i++) {
-        numTransfers += replicateTargetSets.get(i).length;
-      }
-      numBlocks = i;
-      Block[] blocklist = new Block[numBlocks];
-      DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][];
-
-      for (i = 0; i < numBlocks; i++) {
-        blocklist[i] = replicateBlocks.get(0);
-        targets[i] = replicateTargetSets.get(0);
-        replicateBlocks.remove(0);
-        replicateTargetSets.remove(0);
-      }
-      assert(blocklist.length > 0 && targets.length > 0);
-      return new BlockCommand(blocklist, targets);
-    }
+  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
+    return blocktargetlist == null? null:
+        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
   }
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java Mon Jun  2 11:43:01 2008
@@ -32,6 +32,7 @@
  * 
  */
 public class DatanodeID implements WritableComparable {
+  static final DatanodeID[] EMPTY_ARRAY = {}; 
 
   protected String name;      /// hostname:portNumber
   protected String storageID; /// unique per cluster storageID

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon Jun  2 11:43:01 2008
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 14: add corrupt field to LocatedBlock
+   * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and commitBlockSynchronization
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   // error code
   final static int NOTIFY = 0;
@@ -51,6 +51,7 @@
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_BLOCKREPORT = 6;   // request a block report
+  final static int DNA_RECOVERBLOCK = 7;  // request a block recovery
 
   /** 
    * Register Datanode.
@@ -132,4 +133,17 @@
    * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
+  
+  /**
+   * @return the next GenerationStamp
+   */
+  public long nextGenerationStamp() throws IOException;
+
+  /**
+   * Commit block synchronization in lease recovery
+   */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength,
+      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+      ) throws IOException;
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon Jun  2 11:43:01 2008
@@ -546,6 +546,11 @@
       }
       threads.add(Thread.currentThread());
     }
+    
+    public String toString() {
+      return getClass().getSimpleName() + "(file=" + file
+          + ", threads=" + threads + ")";
+    }
   } 
   
   static File getMetaFile(File f , Block b) {
@@ -553,9 +558,75 @@
                      "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
   }
   protected File getMetaFile(Block b) throws IOException {
-    File blockFile = getBlockFile( b );
-    return new File( blockFile.getAbsolutePath() + 
-                     "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
+    return getMetaFile(getBlockFile(b), b);
+  }
+
+  /** Find the corresponding meta data file from a given block file */
+  private static File findMetaFile(final File blockFile) throws IOException {
+    final String prefix = blockFile.getName() + "_";
+    final File parent = blockFile.getParentFile();
+    File[] matches = parent.listFiles(new FilenameFilter() {
+      public boolean accept(File dir, String name) {
+        return dir.equals(parent)
+            && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
+      }
+    });
+
+    if (matches == null || matches.length == 0) {
+      throw new IOException("Meta file not found, blockFile=" + blockFile);
+    }
+    else if (matches.length > 1) {
+      throw new IOException("Found more than one meta files: " 
+          + Arrays.asList(matches));
+    }
+    return matches[0];
+  }
+  
+  /** Find the corresponding meta data file from a given block file */
+  private static long parseGenerationStamp(File blockFile, File metaFile
+      ) throws IOException {
+    String metaname = metaFile.getName();
+    String gs = metaname.substring(blockFile.getName().length() + 1,
+        metaname.length() - METADATA_EXTENSION.length());
+    try {
+      return Long.parseLong(gs);
+    } catch(NumberFormatException nfe) {
+      throw (IOException)new IOException("blockFile=" + blockFile
+          + ", metaFile=" + metaFile).initCause(nfe);
+    }
+  }
+
+  File findBlockFile(Block b) {
+    assert b.generationStamp == GenerationStamp.WILDCARD_STAMP;
+
+    File blockfile = null;
+    ActiveFile activefile = ongoingCreates.get(b);
+    if (activefile != null) {
+      blockfile = activefile.file;
+    }
+    if (blockfile == null) {
+      blockfile = getFile(b);
+    }
+    if (blockfile == null) {
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
+        DataNode.LOG.debug("volumeMap=" + volumeMap);
+      }
+    }
+    return blockfile;
+  }
+
+  /** {@inheritDoc} */
+  public Block getStoredBlock(long blkid) throws IOException {
+    Block b = new Block(blkid);
+    File blockfile = findBlockFile(b);
+    if (blockfile == null) {
+      return null;
+    }
+    File metafile = findMetaFile(blockfile);
+    b.generationStamp = parseGenerationStamp(blockfile, metafile);
+    b.len = blockfile.length();
+    return b;
   }
 
   public boolean metaFileExists(Block b) throws IOException {
@@ -624,11 +695,7 @@
    * Find the block's on-disk length
    */
   public long getLength(Block b) throws IOException {
-    File f = validateBlockFile(b);
-    if(f == null) {
-      throw new IOException("Block " + b + " is not valid.");
-    }
-    return f.length();
+    return getBlockFile(b).length();
   }
 
   /**
@@ -637,6 +704,9 @@
   protected synchronized File getBlockFile(Block b) throws IOException {
     File f = validateBlockFile(b);
     if(f == null) {
+      if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
+        InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+      }
       throw new IOException("Block " + b + " is not valid.");
     }
     return f;
@@ -680,6 +750,126 @@
     return info.detachBlock(block, numLinks);
   }
 
+  static private <T> void updateBlockMap(Map<Block, T> blockmap,
+      Block oldblock, Block newblock) throws IOException {
+    if (blockmap.containsKey(oldblock)) {
+      T value = blockmap.remove(oldblock);
+      blockmap.put(newblock, value);
+    }
+  }
+
+  /** interrupt and wait for all ongoing create threads */
+  private synchronized void interruptOngoingCreates(Block b) {
+    //remove ongoingCreates threads
+    ActiveFile activefile = ongoingCreates.get(b);
+    if (activefile != null) {
+      for(Thread t : activefile.threads) {
+        t.interrupt();
+      }
+      for(Thread t : activefile.threads) {
+        try {
+          t.join();
+        } catch (InterruptedException e) {
+          DataNode.LOG.warn("interruptOngoingCreates: b=" + b
+              + ", activeFile=" + activefile + ", t=" + t, e);
+        }
+      }
+      activefile.threads.clear();
+    }
+  }
+  /** {@inheritDoc} */
+  public synchronized void updateBlock(Block oldblock, Block newblock
+      ) throws IOException {
+    if (oldblock.getBlockId() != newblock.getBlockId()) {
+      throw new IOException("Cannot update oldblock (=" + oldblock
+          + ") to newblock (=" + newblock + ").");
+    }
+
+    File blockFile = findBlockFile(oldblock);
+    interruptOngoingCreates(oldblock);
+    
+    File oldMetaFile = getMetaFile(blockFile, oldblock);
+    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+    
+    //rename meta file to a tmp file
+    File tmpMetaFile = new File(oldMetaFile.getParent(),
+        oldMetaFile.getName()+"_tmp");
+    if (!oldMetaFile.renameTo(tmpMetaFile)){
+      throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
+    }
+
+    //update generation stamp
+    if (oldgs > newblock.generationStamp) {
+      throw new IOException("Cannot update block (id=" + newblock.blkid
+          + ") generation stamp from " + oldgs
+          + " to " + newblock.generationStamp);
+    }
+    
+    //update length
+    if (newblock.len > oldblock.len) {
+      throw new IOException("Cannot update block file (=" + blockFile
+          + ") length from " + oldblock.len + " to " + newblock.len);
+    }
+    if (newblock.len < oldblock.len) {
+      truncateBlock(blockFile, tmpMetaFile, oldblock.len, newblock.len);
+    }
+
+    //rename the tmp file to the new meta file (with new generation stamp)
+    File newMetaFile = getMetaFile(blockFile, newblock);
+    if (!tmpMetaFile.renameTo(newMetaFile)) {
+      throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
+    }
+
+    updateBlockMap(ongoingCreates, oldblock, newblock);
+    updateBlockMap(volumeMap, oldblock, newblock);
+  }
+
+  static private void truncateBlock(File blockFile, File metaFile,
+      long oldlen, long newlen) throws IOException {
+    if (newlen == oldlen) {
+      return;
+    }
+    if (newlen > oldlen) {
+      throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
+          + ") to newlen (=" + newlen + ")");
+    }
+
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
+    int checksumsize = dcs.getChecksumSize();
+    int bpc = dcs.getBytesPerChecksum();
+    long n = (newlen - 1)/bpc + 1;
+    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+    long lastchunkoffset = (n - 1)*bpc;
+    int lastchunksize = (int)(newlen - lastchunkoffset); 
+    byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 
+
+    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+    try {
+      //truncate blockFile 
+      blockRAF.setLength(newlen);
+ 
+      //read last chunk
+      blockRAF.seek(lastchunkoffset);
+      blockRAF.readFully(b, 0, lastchunksize);
+    } finally {
+      blockRAF.close();
+    }
+
+    //compute checksum
+    dcs.update(b, 0, lastchunksize);
+    dcs.writeValue(b, 0, false);
+
+    //update metaFile 
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    try {
+      metaRAF.setLength(newmetalen);
+      metaRAF.seek(newmetalen - checksumsize);
+      metaRAF.write(b, 0, checksumsize);
+    } finally {
+      metaRAF.close();
+    }
+  }
+
   /**
    * Start writing to a block file
    * If isRecovery is true and the block pre-exists, then we kill all
@@ -880,6 +1070,9 @@
     File f = getFile(b);
     if(f != null && f.exists())
       return f;
+    if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
+      InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
+    }
     return null;
   }
 
@@ -1020,12 +1213,4 @@
   public String getStorageInfo() {
     return toString();
   }
-  
-  /**
-   *  A duplicate of {@link #getLength()}
-   */
-  @Deprecated
-  public long getBlockSize(Block b) {
-    return getFile(b).length();
-  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java Mon Jun  2 11:43:01 2008
@@ -89,7 +89,12 @@
    * @throws IOException
    */
   public long getLength(Block b) throws IOException;
-     
+
+  /**
+   * @return the generation stamp stored with the block.
+   */
+  public Block getStoredBlock(long blkid) throws IOException;
+
   /**
    * Returns an input stream to read the contents of the specified block
    * @param b
@@ -136,6 +141,11 @@
   public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
 
   /**
+   * Update the block to the new generation stamp and length.  
+   */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException;
+  
+  /**
    * Finalizes the block previously opened for writing using writeToBlock.
    * The block size is what is in the parameter b and it must match the amount
    *  of data written

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon Jun  2 11:43:01 2008
@@ -127,7 +127,7 @@
   /**
    * Add the given filename to the fs.
    */
-  INode addFile(String path, 
+  INodeFileUnderConstruction addFile(String path, 
                 PermissionStatus permissions,
                 short replication,
                 long preferredBlockSize,
@@ -166,7 +166,6 @@
     }
     // add create file record to log, record new generation stamp
     fsImage.getEditLog().logOpenFile(path, newNode);
-    fsImage.getEditLog().logGenerationStamp(generationStamp);
 
     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                   +path+" is added to the file system");
@@ -287,13 +286,14 @@
    */
   void closeFile(String path, INodeFile file) throws IOException {
     waitForReady();
-
     synchronized (rootDir) {
       // file is closed
       fsImage.getEditLog().logCloseFile(path, file);
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
                                     +path+" with "+ file.getBlocks().length 
                                     +" blocks is persisted to the file system");
+      }
     }
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Mon Jun  2 11:43:01 2008
@@ -527,7 +527,7 @@
                                         clientMachine, 
                                         null);
               fsDir.replaceNode(path, node, cons);
-              fsNamesys.leaseManager.addLease(path, clientName);
+              fsNamesys.leaseManager.addLease(cons.clientName, path);
             } else if (opcode == OP_CLOSE) {
               //
               // Remove lease if it exists.
@@ -535,7 +535,7 @@
               if (old.isUnderConstruction()) {
                 INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
                                                      old;
-                fsNamesys.leaseManager.removeLease(path, cons.getClientName());
+                fsNamesys.leaseManager.removeLease(cons.clientName, path);
               }
             }
             break;
@@ -573,7 +573,7 @@
             old = fsDir.unprotectedDelete(path, timestamp, null);
             if (old != null && old.isUnderConstruction()) {
               INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
-              fsNamesys.leaseManager.removeLease(path, cons.getClientName());
+              fsNamesys.leaseManager.removeLease(cons.clientName, path);
             }
             break;
           }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Mon Jun  2 11:43:01 2008
@@ -1031,7 +1031,7 @@
       }
       INodeFile oldnode = (INodeFile) old;
       fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(path, cons.getClientName()); 
+      fs.leaseManager.addLease(cons.clientName, path); 
     }
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Jun  2 11:43:01 2008
@@ -96,9 +96,8 @@
   FSDirectory dir;
 
   //
-  // Stores the block-->datanode(s) map.  Updated only in response
-  // to client-sent information.
   // Mapping: Block -> { INode, datanodes, self ref } 
+  // Updated only in response to client-sent information.
   //
   BlocksMap blocksMap = new BlocksMap();
 
@@ -238,9 +237,8 @@
 
   /**
    * The global generation stamp for this file system. 
-   * Valid values start from 1000.
    */
-  private GenerationStamp generationStamp = new GenerationStamp(1000);
+  private final GenerationStamp generationStamp = new GenerationStamp();
 
   // Ask Datanode only up to this many blocks to delete.
   private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
@@ -931,7 +929,7 @@
         // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
-        Lease lease = leaseManager.getLease(holder);
+        Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
         //
         // We found the lease for this file. And surprisingly the original
         // holder is trying to recreate this file. This should never occur.
@@ -945,7 +943,7 @@
         //
         // Find the original holder.
         //
-        lease = leaseManager.getLease(pendingFile.getClientName());
+        lease = leaseManager.getLease(pendingFile.clientName);
         if (lease == null) {
           throw new AlreadyBeingCreatedException(
                                                  "failed to create file " + src + " for " + holder +
@@ -958,8 +956,9 @@
         // to proceed. Otherwise, prevent this request from creating file.
         //
         if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: Removing lease " + lease);
-          leaseManager.removeExpiredLease(lease);
+          LOG.info("startFile: recover lease " + lease + ", src=" + src);
+          internalReleaseLease(lease, src);
+          leaseManager.renewLease(lease);
         } else {
           throw new AlreadyBeingCreatedException(
                                                  "failed to create file " + src + " for " + holder +
@@ -988,8 +987,6 @@
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      leaseManager.addLease(src, holder);
-
       //
       // Now we can add the name to the filesystem. This file has no
       // blocks associated with it.
@@ -997,22 +994,64 @@
       checkFsObjectLimit();
 
       // increment global generation stamp
-      long genstamp = generationStamp.nextStamp();
-
-      INode newNode = dir.addFile(src, permissions,
+      long genstamp = nextGenerationStamp();
+      INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
           replication, blockSize, holder, clientMachine, clientNode, genstamp);
       if (newNode == null) {
         throw new IOException("DIR* NameSystem.startFile: " +
                               "Unable to add file to namespace.");
       }
+      leaseManager.addLease(newNode.clientName, src);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
                                    +ie.getMessage());
       throw ie;
     }
 
-    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                   +"add "+src+" to namespace for "+holder);
+    }
+  }
+
+  /** append is not yet ready.  This method is for testing. */
+  void appendFileInternal(String src, String holder, String clientMachine
+      ) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
+          +src+" for "+holder+" at "+clientMachine);
+    }
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot append file" + src, safeMode);
+    if (!isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    try {
+      INodeFile f = dir.getFileINode(src);
+      //assume f != null && !f.isUnderConstruction() && lease does not exist
+      //TODO: remove the assumption 
+
+      DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
+          clientMachine);
+      INodeFileUnderConstruction newnode = f.toINodeFileUnderConstruction(
+          holder, clientMachine, clientNode);
+
+      dir.replaceNode(src, f, newnode);
+      leaseManager.addLease(newnode.clientName, src);
+
+    } catch (IOException ie) {
+      NameNode.stateChangeLog.warn("DIR* NameSystem.appendFile: ", ie);
+      throw ie;
+    }
+
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: "
+          +"add "+src+" to namespace for "+holder);
+    }
   }
 
   /**
@@ -1079,6 +1118,7 @@
 
       // allocate new block record block locations in INode.
       newBlock = allocateBlock(src, pendingFile);
+      pendingFile.targets = targets;
     }
         
     // Create next block
@@ -1108,7 +1148,7 @@
       ) throws IOException {
     INode file = dir.getFileINode(src);
     if (file == null) {
-      Lease lease = leaseManager.getLease(holder);
+      Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
       throw new LeaseExpiredException("No lease on " + src +
                                       " File does not exist. " +
                                       (lease != null ? lease.toString() :
@@ -1116,7 +1156,7 @@
                                        " does not have any open files."));
     }
     if (!file.isUnderConstruction()) {
-      Lease lease = leaseManager.getLease(holder);
+      Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
       throw new LeaseExpiredException("No lease on " + src + 
                                       " File is not open for writing. " +
                                       (lease != null ? lease.toString() :
@@ -1167,30 +1207,13 @@
     } else if (!checkFileProgress(pendingFile, true)) {
       return STILL_WAITING;
     }
-        
-    // The file is no longer pending.
-    // Create permanent INode, update blockmap
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
 
-    // close file and persist block allocations for this file
-    dir.closeFile(src, newFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
 
-    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
                                   + " blocklist persisted");
-
-    leaseManager.removeLease(src, holder);
-
-    //
-    // REMIND - mjc - this should be done only after we wait a few secs.
-    // The namenode isn't giving datanodes enough time to report the
-    // replicated blocks that are automatically done as part of a client
-    // write.
-    //
-
-    // Now that the file is real, we need to be sure to replicate
-    // the blocks.
-    checkReplicationFactor(newFile);
+    }
     return COMPLETE_SUCCESS;
   }
 
@@ -1291,27 +1314,15 @@
    * dumps the contents of recentInvalidateSets
    */
   private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
-    Collection<Collection<Block>> values = recentInvalidateSets.values();
-    Iterator<Map.Entry<String,Collection<Block>>> it = 
-      recentInvalidateSets.entrySet().iterator();
-    if (values.size() == 0) {
-      out.println("Metasave: Blocks waiting deletion: 0");
+    int size = recentInvalidateSets.values().size();
+    out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
+    if (size == 0) {
       return;
     }
-    out.println("Metasave: Blocks waiting deletion from " +
-                values.size() + " datanodes.");
-    while (it.hasNext()) {
-      Map.Entry<String,Collection<Block>> entry = it.next();
-      String storageId = entry.getKey();
-      DatanodeDescriptor node = datanodeMap.get(storageId);
-      Collection<Block> blklist = entry.getValue();
-      if (blklist.size() > 0) {
-        out.print(node.getName());
-        for (Iterator jt = blklist.iterator(); jt.hasNext();) {
-          Block block = (Block) jt.next();
-          out.print(" " + block); 
-        }
-        out.println("");
+    for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
+      Collection<Block> blocks = entry.getValue();
+      if (blocks.size() > 0) {
+        out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
       }
     }
   }
@@ -1478,7 +1489,7 @@
     }
     if (old.isUnderConstruction()) {
       INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
-      leaseManager.removeLease(src, cons.getClientName());
+      leaseManager.removeLease(cons.clientName, src);
     }
     return true;
   }
@@ -1602,7 +1613,11 @@
    * @param src The filename
    * @param holder The datanode that was creating the file
    */
-  void internalReleaseCreate(String src, String holder) throws IOException {
+  void internalReleaseLease(Lease lease, String src) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("lease=" + lease + ", src=" + src);
+    }
+
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
@@ -1616,30 +1631,20 @@
                                    + src + " but file is already closed.");
       return;
     }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: "
+          + src + " does not being written in " + lease);
+    }
+
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
 
-    // The last block that was allocated migth not have been used by the
-    // client. In this case, the size of the last block would be 0. A fsck
-    // will report this block as a missing block because no datanodes have it.
-    // Delete this block.
-    Block[] blocks = pendingFile.getBlocks();
-    if (blocks != null && blocks.length > 0) {
-      Block last = blocks[blocks.length - 1];
-      if (last.getNumBytes() == 0) {
-          pendingFile.removeBlock(last);
-          blocksMap.removeINode(last);
-          for (Iterator<DatanodeDescriptor> it = 
-               blocksMap.nodeIterator(last); it.hasNext();) {
-            DatanodeDescriptor node = it.next();
-            addToInvalidates(last, node);
-          }
-          /* What else do we need to do?
-           * removeStoredBlock()? we do different things when a block is 
-           * removed in different contexts. Mostly these should be
-           * same and/or should be in one place.
-           */
-      }
-    }
+    // Initialize lease recovery for pendingFile
+    pendingFile.assignPrimaryDatanode();
+  }
+
+  private void finalizeINodeFileUnderConstruction(String src,
+      INodeFileUnderConstruction pendingFile) throws IOException {
+    leaseManager.removeLease(pendingFile.clientName, src);
 
     // The file is no longer pending.
     // Create permanent INode, update blockmap
@@ -1649,14 +1654,58 @@
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
-    // replicate blocks of this file.
     checkReplicationFactor(newFile);
-  
-    NameNode.stateChangeLog.info("DIR* NameSystem.internalReleaseCreate: " + 
-                                 src + " is no longer written to by " + 
-                                 holder);
   }
 
+  synchronized void commitBlockSynchronization(Block lastblock,
+      long newgenerationstamp, long newlength,
+      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("commitBlockSynchronization(lastblock=" + lastblock
+          + ", newgenerationstamp=" + newgenerationstamp
+          + ", newlength=" + newlength
+          + ", newtargets=" + Arrays.asList(newtargets) + ")");
+    }
+    BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
+    if (blockinfo == null) {
+      throw new IOException("Block (=" + lastblock + ") not found");
+    }
+    INodeFile iFile = blockinfo.getINode();
+    if (!iFile.isUnderConstruction()) {
+      throw new IOException("Unexpected block (=" + lastblock
+          + ") since the file (=" + iFile.getLocalName()
+          + ") is not under construction");
+    }
+
+    //update block info
+    if (newtargets.length > 0) {
+      DatanodeDescriptor[] descriptors = new DatanodeDescriptor[newtargets.length];
+      for(int i = 0; i < newtargets.length; i++) {
+        descriptors[i] = getDatanode(newtargets[i]);
+      }
+      blockinfo.update(newgenerationstamp, newlength, descriptors);
+    }
+
+    // If this commit does not want to close the file, just persist
+    // block locations and return
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+    String src = leaseManager.findPath(pendingFile);
+    if (deleteblock) {
+      dir.removeBlock(src, pendingFile, lastblock);
+    }
+    if (!closeFile) {
+      dir.persistBlocks(src, pendingFile);
+      getEditLog().logSync();
+      return;
+    }
+    
+    //remove lease, close file
+    finalizeINodeFileUnderConstruction(src, pendingFile);
+    getEditLog().logSync();
+  }
+
+
   /**
    * Renew the lease(s) held by the given client
    */
@@ -1970,6 +2019,10 @@
         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
         updateStats(nodeinfo, true);
         
+        //check lease recovery
+        if (cmd == null) {
+          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        }
         //check pending replication
         if (cmd == null) {
           cmd = nodeinfo.getReplicationCommand(
@@ -4187,6 +4240,15 @@
     return generationStamp.getStamp();
   }
 
+  /**
+   * Increments, logs and then returns the stamp
+   */
+  long nextGenerationStamp() {
+    long gs = generationStamp.nextStamp();
+    getEditLog().logGenerationStamp(gs);
+    return gs;
+  }
+
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java Mon Jun  2 11:43:01 2008
@@ -23,31 +23,29 @@
 /****************************************************************
  * A GenerationStamp is a Hadoop FS primitive, identified by a long.
  ****************************************************************/
-class GenerationStamp implements WritableComparable {
+class GenerationStamp implements WritableComparable<GenerationStamp> {
+  public static final long WILDCARD_STAMP = 1;
+  public static final long FIRST_VALID_STAMP = 1000L;
 
   static {                                      // register a ctor
     WritableFactories.setFactory
       (GenerationStamp.class,
        new WritableFactory() {
-         public Writable newInstance() { return new GenerationStamp(); }
+         public Writable newInstance() { return new GenerationStamp(0); }
        });
   }
 
   long genstamp;
 
   /**
-   * Create a new instance, initialized to 0.
+   * Create a new instance, initialized to FIRST_VALID_STAMP.
    */
-  public GenerationStamp() {
-    this.genstamp = 0;
-  }
+  GenerationStamp() {this(GenerationStamp.FIRST_VALID_STAMP);}
 
   /**
    * Create a new instance, initialized to the specified value.
    */
-  public GenerationStamp(long stamp) {
-    this.genstamp = stamp;
-  }
+  GenerationStamp(long stamp) {this.genstamp = stamp;}
 
   /**
    * Returns the current generation stamp
@@ -64,9 +62,9 @@
   }
 
   /**
-   * Returns the current stamp and incrments the counter
+   * First increments the counter and then returns the stamp 
    */
-  public long nextStamp() {
+  public synchronized long nextStamp() {
     this.genstamp++;
     return this.genstamp;
   }
@@ -88,23 +86,28 @@
   /////////////////////////////////////
   // Comparable
   /////////////////////////////////////
-  public int compareTo(Object o) {
-    GenerationStamp b = (GenerationStamp) o;
-    if (genstamp < b.genstamp) {
-      return -1;
-    } else if (genstamp == b.genstamp) {
-      return 0;
-    } else {
-      return 1;
-    }
+  static int compare(long x, long y) {
+    return x < y? -1: x == y? 0: 1;
+  }
+
+  /** {@inheritDoc} */
+  public int compareTo(GenerationStamp that) {
+    return compare(this.genstamp, that.genstamp);
   }
+
+  /** {@inheritDoc} */
   public boolean equals(Object o) {
     if (!(o instanceof GenerationStamp)) {
       return false;
     }
     return genstamp == ((GenerationStamp)o).genstamp;
   }
-    
+
+  static boolean equalsWithWildcard(long x, long y) {
+    return x == y || x == WILDCARD_STAMP || y == WILDCARD_STAMP;  
+  }
+
+  /** {@inheritDoc} */
   public int hashCode() {
     return 37 * 17 + (int) (genstamp^(genstamp>>>32));
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java Mon Jun  2 11:43:01 2008
@@ -713,27 +713,6 @@
   }
 
   /**
-   * remove a block from the block list. This block should be
-   * the last one on the list.
-   */
-  void removeBlock(Block oldblock) throws IOException {
-    if (this.blocks == null) {
-      throw new IOException("Trying to delete non-existant block " +
-                            oldblock);
-    }
-    int size = this.blocks.length;
-    if (!this.blocks[size-1].equals(oldblock)) {
-      throw new IOException("Trying to delete non-existant block " +
-                            oldblock);
-    }
-    BlockInfo[] newlist = new BlockInfo[size - 1];
-    for (int i = 0; i < size-1; i++) {
-        newlist[i] = this.blocks[i];
-    }
-    this.blocks = newlist;
-  }
-
-  /**
    * Set file block
    */
   void setBlock(int idx, BlockInfo blk) {
@@ -777,18 +756,29 @@
     }
     return blocks[blocks.length - 2];
   }
+
+  INodeFileUnderConstruction toINodeFileUnderConstruction(
+      String clientName, String clientMachine, DatanodeDescriptor clientNode
+      ) throws IOException {
+    if (isUnderConstruction()) {
+      return (INodeFileUnderConstruction)this;
+    }
+    return new INodeFileUnderConstruction(name,
+        blockReplication, modificationTime, preferredBlockSize,
+        blocks, getPermissionStatus(),
+        clientName, clientMachine, clientNode);
+  }
 }
 
 class INodeFileUnderConstruction extends INodeFile {
-  protected StringBytesWritable clientName;         // lease holder
-  protected StringBytesWritable clientMachine;
-  protected DatanodeDescriptor clientNode; // if client is a cluster node too.
-
-  INodeFileUnderConstruction() {
-    clientName = null;
-    clientMachine = null;
-    clientNode = null;
-  }
+  StringBytesWritable clientName = null;         // lease holder
+  StringBytesWritable clientMachine = null;
+  DatanodeDescriptor clientNode = null; // if client is a cluster node too.
+
+  private int primaryNodeIndex = -1; //the node working on lease recovery
+  DatanodeDescriptor[] targets = null;   //locations for last block
+  
+  INodeFileUnderConstruction() {}
 
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
@@ -855,4 +845,49 @@
     return obj;
     
   }
+
+  /**
+   * remove a block from the block list. This block should be
+   * the last one on the list.
+   */
+  void removeBlock(Block oldblock) throws IOException {
+    if (blocks == null) {
+      throw new IOException("Trying to delete non-existant block " + oldblock);
+    }
+    int size_1 = blocks.length - 1;
+    if (!blocks[size_1].equals(oldblock)) {
+      throw new IOException("Trying to delete non-last block " + oldblock);
+    }
+
+    //copy to a new list
+    BlockInfo[] newlist = new BlockInfo[size_1];
+    System.arraycopy(blocks, 0, newlist, 0, size_1);
+    blocks = newlist;
+    
+    // Remove the block locations for the last block.
+    targets = null;
+  }
+
+  /**
+   * Initialize lease recovery for this object
+   * @return the chosen primary datanode
+   */
+  void assignPrimaryDatanode() {
+    //assign the first alive datanode as the primary datanode
+    if (targets.length == 0) {
+      NameNode.stateChangeLog.warn("BLOCK*"
+          + " INodeFileUnderConstruction.initLeaseRecovery:"
+          + " all targets are not alive.");
+    }
+
+    int previous = primaryNodeIndex;
+    //find an alive datanode beginning from previous
+    for(int i = 1; i <= targets.length; i++) {
+      int j = (previous + i)%targets.length;
+      if (targets[j].isAlive) {
+        DatanodeDescriptor primary = targets[primaryNodeIndex = j]; 
+        primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
+      }
+    }
+  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java Mon Jun  2 11:43:01 2008
@@ -30,17 +30,17 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 1: added getBlockMetaDataInfo and updateGenerationStamp
+   * 2: change updateGenerationStamp to updataBlock
    */
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;
 
-  /** @return the BlockMetaDataInfo of a block */
+  /** @return the BlockMetaDataInfo of a block;
+   *  null if the block is not found 
+   */
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
-   * Update the GenerationStamp of a block
-   * @return true iff update was required and done successfully 
+   * Update the block to the new generation stamp and length.  
    */
-  boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
-      ) throws IOException;
+  void updateBlock(Block oldblock, Block newblock) throws IOException;
 }
\ No newline at end of file

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java Mon Jun  2 11:43:01 2008
@@ -18,14 +18,39 @@
 package org.apache.hadoop.dfs;
 
 import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.util.*;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * LeaseManager does the lease housekeeping for writing on files.   
+ * This class also provides useful static methods for lease recovery.
+ * 
+ * Lease Recovery Algorithm
+ * 1) Namenode retrieves lease information
+ * 2) For each file f in the lease, consider the last block b of f
+ * 2.1) Get the datanodes which contains b
+ * 2.2) Assign one of the datanodes as the primary datanode p
+
+ * 2.3) p obtains a new generation stamp form the namenode
+ * 2.4) p get the block info from each datanode
+ * 2.5) p computes the minimum block length
+ * 2.6) p updates the datanodes, which have a valid generation stamp,
+ *      with the new generation stamp and the minimum block length 
+ * 2.7) p acknowledges the namenode the update results
+
+ * 2.8) Namenode updates the BlockInfo
+ * 2.9) Namenode removes f from the lease
+ *      and removes the lease once all files have been removed
+ * 2.10) Namenode commit changes to edit log
+ */
 class LeaseManager {
-  static final Log LOG = FSNamesystem.LOG;
+  static final Log LOG = LogFactory.getLog(LeaseManager.class);
 
   private final FSNamesystem fsnamesystem;
 
@@ -48,8 +73,8 @@
 
   LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
 
-  Lease getLease(String holder) throws IOException {
-    return leases.get(new StringBytesWritable(holder));
+  Lease getLease(StringBytesWritable holder) throws IOException {
+    return leases.get(holder);
   }
   
   SortedSet<Lease> getSortedLeases() {return sortedLeases;}
@@ -72,42 +97,68 @@
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String src, String holder) throws IOException {
+  synchronized void addLease(StringBytesWritable holder, String src
+      ) throws IOException {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
-      leases.put(new StringBytesWritable(holder), lease);
+      leases.put(holder, lease);
       sortedLeases.add(lease);
     } else {
-      sortedLeases.remove(lease);
-      lease.renew();
-      sortedLeases.add(lease);
+      renewLease(lease);
     }
     sortedLeasesByPath.put(src, lease);
-    lease.startedCreate(src);
+    lease.paths.add(new StringBytesWritable(src));
   }
 
   /**
-   * deletes the lease for the specified file
+   * Remove the specified lease and src.
    */
-  synchronized void removeLease(String src, String holder) throws IOException {
-    Lease lease = getLease(holder);
-    if (lease != null) {
-      lease.completedCreate(src);
-      sortedLeasesByPath.remove(src);
+  synchronized void removeLease(Lease lease, String src) throws IOException {
+    sortedLeasesByPath.remove(src);
+    if (!lease.paths.remove(new StringBytesWritable(src))) {
+      LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
+    }
+
+    if (!lease.hasPath()) {
+      leases.remove(lease.holder);
+      if (!sortedLeases.remove(lease)) {
+        LOG.error(lease + " not found in sortedLeases");
+      }
+    }
+  }
+
+  /**
+   * Remove the lease for the specified holder and src
+   */
+  synchronized void removeLease(StringBytesWritable holder, String src
+      ) throws IOException {
+    removeLease(getLease(holder), src);
+  }
 
-      if (!lease.hasPath()) {
-        leases.remove(new StringBytesWritable(holder));
-        sortedLeases.remove(lease);
+  /**
+   * Finds the pathname for the specified pendingFile
+   */
+  synchronized String findPath(INodeFileUnderConstruction pendingFile
+      ) throws IOException {
+    Lease lease = getLease(pendingFile.clientName);
+    if (lease != null) {
+      String src = lease.findPath(pendingFile);
+      if (src != null) {
+        return src;
       }
     }
+    throw new IOException("pendingFile (=" + pendingFile + ") not found."
+        + "(lease=" + lease + ")");
   }
 
   /**
    * Renew the lease(s) held by the given client
    */
   synchronized void renewLease(String holder) throws IOException {
-    Lease lease = getLease(holder);
+    renewLease(getLease(new StringBytesWritable(holder)));
+  }
+  synchronized void renewLease(Lease lease) {
     if (lease != null) {
       sortedLeases.remove(lease);
       lease.renew();
@@ -115,21 +166,6 @@
     }
   }
 
-  synchronized void removeExpiredLease(Lease lease) throws IOException {
-    String holder = lease.holder.getString();
-    for(StringBytesWritable sbw : lease.paths) {
-      String p = sbw.getString();
-      fsnamesystem.internalReleaseCreate(p, holder);
-      sortedLeasesByPath.remove(p);
-    }
-    lease.paths.clear();
-    
-    leases.remove(lease.holder);
-    if (!sortedLeases.remove(lease)) {
-      LOG.error("removeExpiredLease: " + lease + " not found in sortedLeases");
-    }
-  }
-
   /************************************************************
    * A Lease governs all the locks held by a single client.
    * For each client there's a corresponding lease, whose
@@ -142,11 +178,13 @@
     private long lastUpdate;
     private Collection<StringBytesWritable> paths = new TreeSet<StringBytesWritable>();
   
-    public Lease(String holder) throws IOException {
-      this.holder = new StringBytesWritable(holder);
+    /** Only LeaseManager object can create a lease */
+    private Lease(StringBytesWritable holder) throws IOException {
+      this.holder = holder;
       renew();
     }
-    public void renew() {
+    /** Only LeaseManager object can renew a lease */
+    private void renew() {
       this.lastUpdate = FSNamesystem.now();
     }
 
@@ -160,14 +198,20 @@
       return FSNamesystem.now() - lastUpdate > softLimit;
     }
 
-    void startedCreate(String src) throws IOException {
-      paths.add(new StringBytesWritable(src));
-    }
-
-    boolean completedCreate(String src) throws IOException {
-      return paths.remove(new StringBytesWritable(src));
+    /**
+     * @return the path associated with the pendingFile and null if not found.
+     */
+    private String findPath(INodeFileUnderConstruction pendingFile) {
+      for(Iterator<StringBytesWritable> i = paths.iterator(); i.hasNext(); ) {
+        String src = i.next().toString();
+        if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
+          return src;
+        }
+      }
+      return null;
     }
 
+    /** Does this lease contain any path? */
     boolean hasPath() {return !paths.isEmpty();}
 
     /** {@inheritDoc} */
@@ -309,8 +353,11 @@
                      ((top = sortedLeases.first()) != null)) {
                 if (top.expiredHardLimit()) {
                   LOG.info("Lease Monitor: Removing lease " + top
-                      + ", leases remaining: " + sortedLeases.size());
-                  removeExpiredLease(top);
+                      + ", sortedLeases.size()=: " + sortedLeases.size());
+                  for(StringBytesWritable s : top.paths) {
+                    fsnamesystem.internalReleaseLease(top, s.getString());
+                  }
+                  renewLease(top);
                 } else {
                   break;
                 }
@@ -330,4 +377,119 @@
       }
     }
   }
+
+  /*
+   * The following codes provides useful static methods for lease recovery.
+   */
+  /** A convenient class used in lease recovery */
+  private static class BlockRecord { 
+    final DatanodeID id;
+    final InterDatanodeProtocol datanode;
+    final Block block;
+    
+    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+      this.id = id;
+      this.datanode = datanode;
+      this.block = block;
+    }
+
+    public String toString() {
+      return "block:" + block + " node:" + id;
+    }
+  }
+
+  /**
+   * Recover a list of blocks.
+   * This method is invoked by the primary datanode.
+   */
+  static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
+      DatanodeProtocol namenode, Configuration conf) {
+    for(int i = 0; i < blocks.length; i++) {
+      try {
+        recoverBlock(blocks[i], targets[i], namenode, conf, true);
+      } catch (IOException e) {
+        LOG.warn("recoverBlocks, i=" + i, e);
+      }
+    }
+  }
+
+  /** Recover a block */
+  static Block recoverBlock(Block block, DatanodeID[] datanodeids,
+      DatanodeProtocol namenode, Configuration conf,
+      boolean closeFile) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("block=" + block
+          + ", datanodeids=" + Arrays.asList(datanodeids));
+    }
+    List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+    long minlength = Long.MAX_VALUE;
+    int errorCount = 0;
+
+    //check generation stamps
+    for(DatanodeID id : datanodeids) {
+      try {
+        InterDatanodeProtocol datanode
+            = DataNode.createInterDataNodeProtocolProxy(id, conf);
+        BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+        if (info != null && info.getGenerationStamp() >= block.generationStamp) {
+          syncList.add(new BlockRecord(id, datanode, new Block(info)));
+          if (info.len < minlength) {
+            minlength = info.len;
+          }
+        }
+      } catch (IOException e) {
+        ++errorCount;
+        InterDatanodeProtocol.LOG.warn(
+            "Failed to getBlockMetaDataInfo for block (=" + block 
+            + ") from datanode (=" + id + ")", e);
+      }
+    }
+
+    if (syncList.isEmpty() && errorCount > 0) {
+      throw new IOException("All datanodes failed: block=" + block
+          + ", datanodeids=" + Arrays.asList(datanodeids));
+    }
+    return syncBlock(block, minlength, syncList, namenode, closeFile);
+  }
+
+  /** Block synchronization */
+  private static Block syncBlock(Block block, long minlength,
+      List<BlockRecord> syncList, DatanodeProtocol namenode,
+      boolean closeFile) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("block=" + block + ", minlength=" + minlength
+          + ", syncList=" + syncList + ", closeFile=" + closeFile);
+    }
+
+    //syncList.isEmpty() that all datanodes do not have the block
+    //so the block can be deleted.
+    if (syncList.isEmpty()) {
+      namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
+          DatanodeID.EMPTY_ARRAY);
+      return null;
+    }
+
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+    long generationstamp = namenode.nextGenerationStamp();
+    Block newblock = new Block(block.blkid, minlength, generationstamp);
+
+    for(BlockRecord r : syncList) {
+      try {
+        r.datanode.updateBlock(r.block, newblock);
+        successList.add(r.id);
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+            + newblock + ", datanode=" + r.id + ")", e);
+      }
+    }
+
+    if (!successList.isEmpty()) {
+      namenode.commitBlockSynchronization(block,
+          newblock.generationStamp, newblock.len, closeFile, false,
+          successList.toArray(new DatanodeID[successList.size()]));
+      return newblock; // success
+    }
+    return null; // failed
+  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Jun  2 11:43:01 2008
@@ -288,6 +288,20 @@
     myMetrics.numFilesCreated.inc();
   }
 
+  /** Coming in a future release.... */
+  void append(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.append: file "
+          +src+" for "+clientName+" at "+clientMachine);
+    }
+    //TODO: add namesystem.appendFile(...), which calls appendFileInternal(...)
+    namesystem.appendFileInternal(src, clientName, clientMachine);
+
+    //TODO: inc myMetrics;
+  }
+
+  /** {@inheritDoc} */
   public boolean setReplication(String src, 
                                 short replication
                                 ) throws IOException {
@@ -358,6 +372,20 @@
     }
   }
 
+  /** {@inheritDoc} */
+  public long nextGenerationStamp() {
+    return namesystem.nextGenerationStamp();
+  }
+
+  /** {@inheritDoc} */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength,
+      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+      ) throws IOException {
+    namesystem.commitBlockSynchronization(block,
+        newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+  }
+  
   public long getPreferredBlockSize(String filename) throws IOException {
     return namesystem.getPreferredBlockSize(filename);
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java Mon Jun  2 11:43:01 2008
@@ -46,6 +46,15 @@
     return new String(get(),"UTF8");
   }
 
+  /** {@inheritDoc} */
+  public String toString() {
+    try {
+      return getString();
+    } catch (IOException e) {
+      throw (RuntimeException)new RuntimeException().initCause(e);
+    }
+  }
+
   /**
    * Compare to a String.
    */

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java Mon Jun  2 11:43:01 2008
@@ -97,6 +97,15 @@
         oStream = null;
       }
     }
+
+    synchronized long getGenerationStamp() {
+      return theBlock.getGenerationStamp();
+    }
+
+    synchronized void updateBlock(Block b) {
+      theBlock.generationStamp = b.generationStamp;
+      setlength(b.len);
+    }
     
     synchronized long getlength() {
       if (!finalized) {
@@ -292,6 +301,27 @@
     return binfo.getlength();
   }
 
+  /** {@inheritDoc} */
+  public Block getStoredBlock(long blkid) throws IOException {
+    Block b = new Block(blkid);
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      return null;
+    }
+    b.generationStamp = binfo.getGenerationStamp();
+    b.len = binfo.getlength();
+    return b;
+  }
+
+  /** {@inheritDoc} */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
+    BInfo binfo = blockMap.get(newblock);
+    if (binfo == null) {
+      throw new IOException("BInfo not found, b=" + newblock);
+    }
+    binfo.updateBlock(newblock);
+  }
+
   public synchronized void invalidate(Block[] invalidBlks) throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
@@ -586,5 +616,4 @@
   public String getStorageInfo() {
     return "Simulated FSDataset-" + storageId;
   }
-
 }