You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 00:57:34 UTC

svn commit: r820487 [4/6] - in /hadoop/hdfs/branches/branch-0.21: ./ .eclipse.templates/.launches/ lib/ src/contrib/block_forensics/ src/contrib/block_forensics/client/ src/contrib/block_forensics/ivy/ src/contrib/block_forensics/src/ src/contrib/block...

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Wed Sep 30 22:57:30 2009
@@ -22,7 +22,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -37,9 +36,11 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.security.AccessTokenHandler;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -237,6 +238,80 @@
   }
 
   /**
+   * Commit the last block of the file.
+   * 
+   * @param fileINode file inode
+   * @param commitBlock - contains client reported block length and generation
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  void commitLastBlock(INodeFileUnderConstruction fileINode, 
+                       Block commitBlock) throws IOException {
+    if(commitBlock == null)
+      return; // not committing, this is a block allocation retry
+    BlockInfo lastBlock = fileINode.getLastBlock();
+    if(lastBlock == null)
+      return; // no blocks in file yet
+    if(lastBlock.isComplete())
+      return; // already completed (e.g. by syncBlock)
+    assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
+      "commitBlock length is less than the stored one "
+      + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
+    ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
+  }
+
+  /**
+   * Convert a specified block of the file to a complete block.
+   * @param fileINode file
+   * @param blkIndex  block index in the file
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
+  throws IOException {
+    if(blkIndex < 0)
+      return null;
+    BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
+    if(curBlock.isComplete())
+      return curBlock;
+    BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
+    if(ucBlock.numNodes() < minReplication)
+      throw new IOException("Cannot complete block: " +
+          "block does not satisfy minimal replication requirement.");
+    BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+    // replace penultimate block in file
+    fileINode.setBlock(blkIndex, completeBlock);
+    // replace block in the blocksMap
+    return blocksMap.replaceBlock(completeBlock);
+  }
+
+  BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
+  throws IOException {
+    BlockInfo[] fileBlocks = fileINode.getBlocks();
+    for(int idx = 0; idx < fileBlocks.length; idx++)
+      if(fileBlocks[idx] == block) {
+        return completeBlock(fileINode, idx);
+      }
+    return block;
+  }
+
+  /**
+   * Convert the last block of the file to an under construction block.
+   * @param fileINode file
+   * @param targets data-nodes that will form the pipeline for this block
+   */
+  void convertLastBlockToUnderConstruction(
+      INodeFileUnderConstruction fileINode,
+      DatanodeDescriptor[] targets) throws IOException {
+    BlockInfo oldBlock = fileINode.getLastBlock();
+    if(oldBlock == null)
+      return;
+    BlockInfoUnderConstruction ucBlock =
+      fileINode.setLastBlock(oldBlock, targets);
+    blocksMap.replaceBlock(ucBlock);
+  }
+
+  /**
    * Get all valid locations of the block
    */
   ArrayList<String> getValidLocations(Block block) {
@@ -254,7 +329,7 @@
     return machineSet;
   }
 
-  List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+  List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
       long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
@@ -269,43 +344,12 @@
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return null;
+      return Collections.<LocatedBlock>emptyList();
 
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      // get block locations
-      int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
-      int numCorruptReplicas = corruptReplicas
-          .numCorruptReplicas(blocks[curBlk]);
-      if (numCorruptNodes != numCorruptReplicas) {
-        FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
-            + blocks[curBlk] + "blockMap has " + numCorruptNodes
-            + " but corrupt replicas map has " + numCorruptReplicas);
-      }
-      boolean blockCorrupt = (numCorruptNodes == numNodes);
-      int numMachineSet = blockCorrupt ? numNodes :
-                          (numNodes - numCorruptNodes);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
-      if (numMachineSet > 0) {
-        numNodes = 0;
-        for (Iterator<DatanodeDescriptor> it = 
-             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          DatanodeDescriptor dn = it.next();
-          boolean replicaCorrupt = 
-            corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
-          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
-            machineSet[numNodes++] = dn;
-        }
-      }
-      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
-          blockCorrupt);
-      if (namesystem.isAccessTokenEnabled) {
-        b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
-      }
-      results.add(b);
+      results.add(getBlockLocation(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -314,6 +358,41 @@
     return results;
   }
 
+  /** @return a LocatedBlock for the given block */
+  LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+      ) throws IOException {
+    if (!blk.isComplete()) {
+      final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+      final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+      return namesystem.createLocatedBlock(uc, locations, pos, false);
+    }
+
+    // get block locations
+    final int numCorruptNodes = countNodes(blk).corruptReplicas();
+    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
+    if (numCorruptNodes != numCorruptReplicas) {
+      FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+          + blk + " blockMap has " + numCorruptNodes
+          + " but corrupt replicas map has " + numCorruptReplicas);
+    }
+
+    final int numNodes = blocksMap.numNodes(blk);
+    final boolean isCorrupt = numCorruptNodes == numNodes;
+    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
+    final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    if (numMachines > 0) {
+      int j = 0;
+      for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
+          it.hasNext();) {
+        final DatanodeDescriptor d = it.next();
+        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+        if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+          machines[j++] = d;
+      }
+    }
+    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);    
+  }
+
   /**
    * Check whether the replication parameter is within the range
    * determined by system configuration.
@@ -369,7 +448,7 @@
       pendingDeletionBlocksCount++;
       if (log) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-            + b.getBlockName() + " to " + dn.getName());
+            + b + " to " + dn.getName());
       }
     }
   }
@@ -399,7 +478,7 @@
     }
     if (datanodes.length() != 0) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-          + b.getBlockName() + " to " + datanodes.toString());
+          + b + " to " + datanodes.toString());
     }
   }
 
@@ -457,6 +536,10 @@
       addToInvalidates(storedBlock, node);
       return;
     } 
+
+    // Add replica to the data-node if it is not already there
+    node.addBlock(storedBlock);
+
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
     if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
@@ -885,7 +968,8 @@
     Collection<Block> toAdd = new LinkedList<Block>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
-    node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
 
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -899,6 +983,9 @@
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -908,7 +995,8 @@
    */
   private Block addStoredBlock(final Block block,
                                DatanodeDescriptor node,
-                               DatanodeDescriptor delNodeHint) {
+                               DatanodeDescriptor delNodeHint)
+  throws IOException {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -1020,13 +1108,17 @@
     int numCurrentReplica = numLiveReplicas
       + pendingReplications.getNumReplicas(storedBlock);
 
+    if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
+        numLiveReplicas >= minReplication)
+      storedBlock = completeBlock(fileINode, storedBlock);
+
     // check whether safe replication is reached for the block
-    namesystem.incrementSafeBlockCount(numCurrentReplica);
+    // only complete blocks are counted towards that
+    if(storedBlock.isComplete())
+      namesystem.incrementSafeBlockCount(numCurrentReplica);
 
-    //
-    // if file is being actively written to, then do not check
-    // replication-factor here. It will be checked when the file is closed.
-    //
+    // if file is under construction, then check whether the block
+    // can be completed
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
     }
@@ -1253,7 +1345,30 @@
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.remove(block);
-    addStoredBlock(block, node, delHintNode);
+
+    // blockReceived reports a finalized block
+    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.processReportedBlock(this, block, ReplicaState.FINALIZED,
+                              toAdd, toInvalidate, toCorrupt);
+    // the block is only in one of the lists
+    // if it is in none then data-node already has it
+    assert toAdd.size() + toInvalidate.size() <= 1 :
+      "The block should be only in one of the lists.";
+
+    for (Block b : toAdd) {
+      addStoredBlock(b, node, delHintNode);
+    }
+    for (Block b : toInvalidate) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+          + b + " on " + node.getName() + " size " + b.getNumBytes()
+          + " does not belong to any file.");
+      addToInvalidates(b, node);
+    }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -1351,6 +1466,14 @@
     return blocksMap.getStoredBlock(block);
   }
 
+  /**
+   * Find the block by block ID.
+   */
+  BlockInfo findStoredBlock(long blockId) {
+    Block wildcardBlock = new Block(blockId, 0, GenerationStamp.WILDCARD_STAMP);
+    return blocksMap.getStoredBlock(wildcardBlock);
+  }
+
   /* updates a block in under replication queue */
   void updateNeededReplications(Block block, int curReplicasDelta,
       int expectedReplicasDelta) {
@@ -1522,7 +1645,7 @@
     return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
   }
 
-  BlockInfo addINode(Block block, INodeFile iNode) {
+  BlockInfo addINode(BlockInfo block, INodeFile iNode) {
     return blocksMap.addINode(block, iNode);
   }
 

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Wed Sep 30 22:57:30 2009
@@ -75,11 +75,10 @@
   /**
    * Add block b belonging to the specified file inode to the map.
    */
-  BlockInfo addINode(Block b, INodeFile iNode) {
-    int replication = iNode.getReplication();
+  BlockInfo addINode(BlockInfo b, INodeFile iNode) {
     BlockInfo info = map.get(b);
-    if (info == null) {
-      info = new BlockInfo(b, replication);
+    if (info != b) {
+      info = b;
       map.put(info, info);
     }
     info.setINode(iNode);
@@ -191,4 +190,23 @@
   float getLoadFactor() {
     return loadFactor;
   }
+
+  /**
+   * Replace a block in the block map by a new block.
+   * The new block and the old one have the same key.
+   * @param newBlock - block for replacement
+   * @return new block
+   */
+  BlockInfo replaceBlock(BlockInfo newBlock) {
+    BlockInfo currentBlock = map.get(newBlock);
+    assert currentBlock != null : "the block if not in blocksMap";
+    // replace block in data-node lists
+    for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
+      DatanodeDescriptor dn = currentBlock.getDatanode(idx);
+      dn.replaceBlock(currentBlock, newBlock);
+    }
+    // replace block in the map itself
+    map.put(newBlock, newBlock);
+    return newBlock;
+  }
 }

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Wed Sep 30 22:57:30 2009
@@ -25,7 +25,11 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -55,29 +59,36 @@
   }
 
   /** A BlockTargetPair queue. */
-  private static class BlockQueue {
-    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+  private static class BlockQueue<E> {
+    private final Queue<E> blockq = new LinkedList<E>();
 
     /** Size of the queue */
     synchronized int size() {return blockq.size();}
 
     /** Enqueue */
-    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
-      return blockq.offer(new BlockTargetPair(block, targets));
+    synchronized boolean offer(E e) { 
+      return blockq.offer(e);
     }
 
     /** Dequeue */
-    synchronized List<BlockTargetPair> poll(int numBlocks) {
+    synchronized List<E> poll(int numBlocks) {
       if (numBlocks <= 0 || blockq.isEmpty()) {
         return null;
       }
 
-      List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+      List<E> results = new ArrayList<E>();
       for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
         results.add(blockq.poll());
       }
       return results;
     }
+
+    /**
+     * Returns <tt>true</tt> if the queue contains the specified element.
+     */
+    boolean contains(E e) {
+      return blockq.contains(e);
+    }
   }
 
   private volatile BlockInfo blockList = null;
@@ -87,9 +98,10 @@
   protected boolean needKeyUpdate = false;
 
   /** A queue of blocks to be replicated by this datanode */
-  private BlockQueue replicateBlocks = new BlockQueue();
+  private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
   /** A queue of blocks to be recovered by this datanode */
-  private BlockQueue recoverBlocks = new BlockQueue();
+  private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+                                new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
   private Set<Block> invalidateBlocks = new TreeSet<Block>();
 
@@ -201,6 +213,21 @@
     blockList = b.listInsert(blockList, this);
   }
 
+  /**
+   * Replace specified old block with a new one in the DataNodeDescriptor.
+   * 
+   * @param oldBlock - block to be replaced
+   * @param newBlock - a replacement block
+   * @return the new block
+   */
+  BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
+    boolean done = removeBlock(oldBlock);
+    assert done : "Old block should belong to the data-node when replacing";
+    done = addBlock(newBlock);
+    assert done : "New block should not belong to the data-node when replacing";
+    return newBlock;
+  }
+
   void resetBlocks() {
     this.capacity = 0;
     this.remaining = 0;
@@ -262,15 +289,20 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    replicateBlocks.offer(block, targets);
+    replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
 
   /**
    * Store block recovery work.
    */
-  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
-    assert(block != null && targets != null && targets.length > 0);
-    recoverBlocks.offer(block, targets);
+  void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+    if(recoverBlocks.contains(block)) {
+      // this prevents adding the same block twice to the recovery queue
+      FSNamesystem.LOG.info("Block " + block +
+                            " is already in the recovery queue.");
+      return;
+    }
+    recoverBlocks.offer(block);
   }
 
   /**
@@ -308,10 +340,16 @@
         new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
   }
 
-  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
-    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
-    return blocktargetlist == null? null:
-        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+    if(blocks == null)
+      return null;
+    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+    for(BlockInfoUnderConstruction b : blocks) {
+      brCommand.add(new RecoveringBlock(
+          b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+    }
+    return brCommand;
   }
 
   /**
@@ -361,11 +399,12 @@
     return blockarray;
   }
 
-  void reportDiff(BlocksMap blocksMap,
+  void reportDiff(BlockManager blockManager,
                   BlockListAsLongs newReport,
                   Collection<Block> toAdd,    // add to DatanodeDescriptor
                   Collection<Block> toRemove, // remove from DatanodeDescriptor
-                  Collection<Block> toInvalidate) { // should be removed from DN
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -373,29 +412,16 @@
     assert added : "Delimiting block cannot be present in the node";
     if(newReport == null)
       newReport = new BlockListAsLongs();
-    // scan the report and collect newly reported blocks
-    // Note we are taking special precaution to limit tmp blocks allocated
-    // as part this block report - which why block list is stored as longs
-    for (Block iblk : newReport) {
-      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-      if(storedBlock == null) {
-        // If block is not in blocksMap it does not belong to any file
-        toInvalidate.add(new Block(iblk));
-        continue;
-      }
-      if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
-        // if the size differs from what is in the blockmap, then return
-        // the new block. addStoredBlock will then pick up the right size of this
-        // block and will update the block object in the BlocksMap
-        if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
-          toAdd.add(new Block(iblk));
-        } else {
-          toAdd.add(storedBlock);
-        }
-        continue;
-      }
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
+                                               toAdd, toInvalidate, toCorrupt);
       // move block to the head of the list
-      this.moveBlockToHead(storedBlock);
+      if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
+        this.moveBlockToHead(storedBlock);
     }
     // collect blocks that have not been reported
     // all of them are next to the delimiter
@@ -405,6 +431,105 @@
     this.removeBlock(delimiter);
   }
 
+  /**
+   * Process a block replica reported by the data-node.
+   * 
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location is
+   * added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * </ol>
+   * 
+   * @param blockManager
+   * @param block reported block replica
+   * @param rState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @return
+   */
+  BlockInfo processReportedBlock(
+                  BlockManager blockManager,
+                  Block block,                // reported block replica
+                  ReplicaState rState,        // reported replica state
+                  Collection<Block> toAdd,    // add to DatanodeDescriptor
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+    FSNamesystem.LOG.debug("Reported block " + block
+        + " on " + getName() + " size " + block.getNumBytes()
+        + " replicaState = " + rState);
+
+    // find block by blockId
+    BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId());
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
+
+    FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState());
+
+    // Block is on the DN
+    boolean isCorrupt = false;
+    switch(rState) {
+    case FINALIZED:
+      switch(storedBlock.getBlockUCState()) {
+      case COMPLETE:
+      case COMMITTED:
+        // This is a temporary hack until Block.equals() and compareTo()
+        // are changed not to take into account the generation stamp for searching
+        // in  blocksMap
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()) {
+          toInvalidate.add(new Block(block));
+          return storedBlock;
+        }
+
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
+            || storedBlock.getNumBytes() != block.getNumBytes())
+          isCorrupt = true;
+        break;
+      case UNDER_CONSTRUCTION:
+      case UNDER_RECOVERY:
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+            this, block, rState);
+      }
+      if(!isCorrupt && storedBlock.findDatanode(this) < 0)
+        if (storedBlock.getNumBytes() != block.getNumBytes()) {
+          toAdd.add(new Block(block));
+        } else {
+          toAdd.add(storedBlock);
+        }
+      break;
+    case RBW:
+    case RWR:
+      if(!storedBlock.isComplete())
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+                                                      this, block, rState);
+      else
+        isCorrupt = true;
+      break;
+    case RUR:       // should not be reported
+    case TEMPORARY: // should not be reported
+    default:
+      FSNamesystem.LOG.warn("Unexpected replica state " + rState
+          + " for block: " + storedBlock + 
+          " on " + getName() + " size " + storedBlock.getNumBytes());
+      break;
+    }
+    if(isCorrupt)
+        toCorrupt.add(storedBlock);
+    return storedBlock;
+  }
+
   /** Serialization for FSEditLog */
   void readFieldsFromFSEditLog(DataInput in) throws IOException {
     this.name = DeprecatedUTF8.readString(in);

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Sep 30 22:57:30 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -191,7 +192,7 @@
    */
   INode unprotectedAddFile( String path, 
                             PermissionStatus permissions,
-                            Block[] blocks, 
+                            BlockInfo[] blocks, 
                             short replication,
                             long modificationTime,
                             long atime,
@@ -261,7 +262,8 @@
         // Add file->block mapping
         INodeFile newF = (INodeFile)newNode;
         for (int i = 0; i < nrBlocks; i++) {
-          newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
+          BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
+          newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
         }
       }
     }
@@ -271,27 +273,39 @@
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  Block addBlock(String path, INode[] inodes, Block block
-      ) throws QuotaExceededException  {
+  BlockInfo addBlock(String path,
+                     INode[] inodes,
+                     Block block,
+                     DatanodeDescriptor targets[]
+  ) throws QuotaExceededException, IOException  {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
+      assert inodes[inodes.length-1].isUnderConstruction() :
+        "INode should correspond to a file under construction";
+      INodeFileUnderConstruction fileINode = 
+        (INodeFileUnderConstruction)inodes[inodes.length-1];
 
       // check quota limits and updated space consumed
       updateCount(inodes, inodes.length-1, 0, 
-                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
-      
-      // associate the new list of blocks with this file
-      BlockInfo blockInfo = getBlockManager().addINode(block, fileNode);
-      fileNode.addBlock(blockInfo);
+                  fileINode.getPreferredBlockSize()*fileINode.getReplication());
+
+      // associate new last block for the file
+      BlockInfoUnderConstruction blockInfo =
+        new BlockInfoUnderConstruction(
+            block,
+            fileINode.getReplication(),
+            BlockUCState.UNDER_CONSTRUCTION,
+            targets);
+      getBlockManager().addINode(blockInfo, fileINode);
+      fileINode.addBlock(blockInfo);
 
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                     + path + " with " + block
                                     + " block is added to the in-memory "
                                     + "file system");
+      return blockInfo;
     }
-    return block;
   }
 
   /**
@@ -335,7 +349,7 @@
 
     synchronized (rootDir) {
       // modify file-> block and blocksMap
-      fileNode.removeBlock(block);
+      fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
       // If block is removed from blocksMap remove it from corruptReplicasMap
       getBlockManager().removeFromCorruptReplicasMap(block);
@@ -732,7 +746,7 @@
       }
       
       int index = 0;
-      for (Block b : newnode.getBlocks()) {
+      for (BlockInfo b : newnode.getBlocks()) {
         BlockInfo info = getBlockManager().addINode(b, newnode);
         newnode.setBlock(index, info); // inode refers to the block in BlocksMap
         index++;

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Sep 30 22:57:30 2009
@@ -461,19 +461,9 @@
             blockSize = readLong(in);
           }
           // get blocks
-          Block blocks[] = null;
-          if (logVersion <= -14) {
-            blocks = readBlocks(in);
-          } else {
-            BlockTwo oldblk = new BlockTwo();
-            int num = in.readInt();
-            blocks = new Block[num];
-            for (int i = 0; i < num; i++) {
-              oldblk.readFields(in);
-              blocks[i] = new Block(oldblk.blkid, oldblk.len, 
-                                    Block.GRANDFATHER_GENERATION_STAMP);
-            }
-          }
+          boolean isFileUnderConstruction = (opcode == OP_ADD);
+          BlockInfo blocks[] = 
+            readBlocks(in, logVersion, isFileUnderConstruction, replication);
 
           // Older versions of HDFS does not store the block size in inode.
           // If the file has more than one block, use the size of the
@@ -521,7 +511,7 @@
                                                     path, permissions,
                                                     blocks, replication, 
                                                     mtime, atime, blockSize);
-          if (opcode == OP_ADD) {
+          if (isFileUnderConstruction) {
             numOpAdd++;
             //
             // Replace current node with a INodeUnderConstruction.
@@ -538,7 +528,7 @@
                                       clientMachine, 
                                       null);
             fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.clientName, path);
+            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
           }
           break;
         } 
@@ -1247,12 +1237,26 @@
     return Long.parseLong(FSImage.readString(in));
   }
 
-  static private Block[] readBlocks(DataInputStream in) throws IOException {
+  static private BlockInfo[] readBlocks(
+      DataInputStream in,
+      int logVersion,
+      boolean isFileUnderConstruction,
+      short replication) throws IOException {
     int numBlocks = in.readInt();
-    Block[] blocks = new Block[numBlocks];
+    BlockInfo[] blocks = new BlockInfo[numBlocks];
+    Block blk = new Block();
+    BlockTwo oldblk = new BlockTwo();
     for (int i = 0; i < numBlocks; i++) {
-      blocks[i] = new Block();
-      blocks[i].readFields(in);
+      if (logVersion <= -14) {
+        blk.readFields(in);
+      } else {
+        oldblk.readFields(in);
+        blk.set(oldblk.blkid, oldblk.len, Block.GRANDFATHER_GENERATION_STAMP);
+      }
+      if(isFileUnderConstruction && i == numBlocks-1)
+        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
+      else
+        blocks[i] = new BlockInfo(blk, replication);
     }
     return blocks;
   }

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Sep 30 22:57:30 2009
@@ -55,6 +55,7 @@
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -1403,7 +1404,7 @@
       }
       INodeFile oldnode = (INodeFile) old;
       fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(cons.clientName, path); 
+      fs.leaseManager.addLease(cons.getClientName(), path); 
     }
   }
 
@@ -1419,10 +1420,17 @@
     int numBlocks = in.readInt();
     BlockInfo[] blocks = new BlockInfo[numBlocks];
     Block blk = new Block();
-    for (int i = 0; i < numBlocks; i++) {
+    int i = 0;
+    for (; i < numBlocks-1; i++) {
       blk.readFields(in);
       blocks[i] = new BlockInfo(blk, blockReplication);
     }
+    // last block is UNDER_CONSTRUCTION
+    if(numBlocks > 0) {
+      blk.readFields(in);
+      blocks[i] = new BlockInfoUnderConstruction(
+        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+    }
     PermissionStatus perm = PermissionStatus.read(in);
     String clientName = readString(in);
     String clientMachine = readString(in);
@@ -1430,7 +1438,7 @@
     // These locations are not used at all
     int numLocs = in.readInt();
     DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
-    for (int i = 0; i < numLocs; i++) {
+    for (i = 0; i < numLocs; i++) {
       locations[i] = new DatanodeDescriptor();
       locations[i].readFields(in);
     }

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 30 22:57:30 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -563,13 +564,18 @@
     }
     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
     long totalSize = 0;
+    BlockInfo curBlock;
     while(totalSize<size && iter.hasNext()) {
-      totalSize += addBlock(iter.next(), results);
+      curBlock = iter.next();
+      if(!curBlock.isComplete())  continue;
+      totalSize += addBlock(curBlock, results);
     }
     if(totalSize<size) {
       iter = node.getBlockIterator(); // start from the beginning
       for(int i=0; i<startBlock&&totalSize<size; i++) {
-        totalSize += addBlock(iter.next(), results);
+        curBlock = iter.next();
+        if(!curBlock.isComplete())  continue;
+        totalSize += addBlock(curBlock, results);
       }
     }
 
@@ -707,14 +713,46 @@
     if (doAccessTime && isAccessTimeSupported()) {
       dir.setTimes(src, inode, -1, now(), false);
     }
-    final Block[] blocks = inode.getBlocks();
+    final BlockInfo[] blocks = inode.getBlocks();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+    }
     if (blocks == null) {
       return null;
     }
-    final List<LocatedBlock> results = blocks.length == 0?
-        new ArrayList<LocatedBlock>(0):
-        blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE);
-    return inode.createLocatedBlocks(results);
+
+    if (blocks.length == 0) {
+      return new LocatedBlocks(0, inode.isUnderConstruction(),
+          Collections.<LocatedBlock>emptyList(), null, false);
+    } else {
+      final long n = inode.computeFileSize(false);
+      final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+          blocks, offset, length, Integer.MAX_VALUE);
+      final BlockInfo last = inode.getLastBlock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("last = " + last);
+      }
+
+      if (!last.isComplete()) {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n), false);
+      }
+      else {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+      }
+    }
+  }
+
+  /** Create a LocatedBlock. */
+  LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+      final long offset, final boolean corrupt) throws IOException {
+    final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+    if (isAccessTokenEnabled) {
+      lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+          EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+    }
+    return lb;
   }
 
   /**
@@ -912,40 +950,45 @@
         // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
-        Lease lease = leaseManager.getLease(holder);
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease != null) {
+        Lease lease = leaseManager.getLeaseByPath(src);
+        if (lease == null) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because current leaseholder is trying to recreate file.");
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because pendingCreates is non-null but no leases found.");
         }
         //
-        // Find the original holder.
+        // We found the lease for this file. And surprisingly the original
+        // holder is trying to recreate this file. This should never occur.
         //
-        lease = leaseManager.getLease(pendingFile.clientName);
-        if (lease == null) {
+        if (lease.getHolder().equals(holder)) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because pendingCreates is non-null but no leases found.");
-        }
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because current leaseholder is trying to recreate file.");
+        }
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
         //
+        // Current lease holder is different from the requester.
         // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery.
+        // period, then start lease recovery, otherwise fail.
         //
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          internalReleaseLease(lease, src);
-        }
-        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
-                                               " on client " + clientMachine + 
-                                               ", because this file is already being created by " +
-                                               pendingFile.getClientName() + 
-                                               " on " + pendingFile.getClientMachine());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+
+        } else
+          throw new AlreadyBeingCreatedException("failed to create file " +
+              src + " for " + holder + " on client " + clientMachine + 
+              ", because this file is already being created by " +
+              pendingFile.getClientName() + 
+              " on " + pendingFile.getClientMachine());
       }
 
       try {
@@ -998,7 +1041,7 @@
                                         clientMachine,
                                         clientNode);
         dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.clientName, src);
+        leaseManager.addLease(cons.getClientName(), src);
 
       } else {
        // Now we can add the name to the filesystem. This file has no
@@ -1014,7 +1057,7 @@
           throw new IOException("DIR* NameSystem.startFile: " +
                                 "Unable to add file to namespace.");
         }
-        leaseManager.addLease(newNode.clientName, src);
+        leaseManager.addLease(newNode.getClientName(), src);
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                      +"add "+src+" to namespace for "+holder);
@@ -1048,40 +1091,36 @@
     LocatedBlock lb = null;
     synchronized (this) {
       INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-
-      BlockInfo[] blocks = file.getBlocks();
-      if (blocks != null && blocks.length > 0) {
-        BlockInfo last = blocks[blocks.length-1];
-        // this is a redundant search in blocksMap
-        // should be resolved by the new implementation of append
-        BlockInfo storedBlock = blockManager.getStoredBlock(last);
-        assert last == storedBlock : "last block should be in the blocksMap";
-        if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+      BlockInfo lastBlock = file.getLastBlock();
+      if (lastBlock != null) {
+        assert lastBlock == blockManager.getStoredBlock(lastBlock) :
+          "last block of the file is not in blocksMap";
+        if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
           long fileLength = file.computeContentSummary().getLength();
-          DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
+          DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
           // remove the replica locations of this block from the node
           for (int i = 0; i < targets.length; i++) {
-            targets[i].removeBlock(storedBlock);
+            targets[i].removeBlock(lastBlock);
           }
-          // set the locations of the last block in the lease record
-          file.setLastBlock(storedBlock, targets);
+          // convert last block to under-construction and set its locations
+          blockManager.convertLastBlockToUnderConstruction(file, targets);
 
-          lb = new LocatedBlock(last, targets, 
-                                fileLength-storedBlock.getNumBytes());
+          lb = new LocatedBlock(lastBlock, targets, 
+                                fileLength-lastBlock.getNumBytes());
           if (isAccessTokenEnabled) {
             lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
                 .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
           }
 
           // Remove block from replication queue.
-          blockManager.updateNeededReplications(last, 0, 0);
+          blockManager.updateNeededReplications(lastBlock, 0, 0);
 
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
           for (DatanodeDescriptor dd : targets) {
             String datanodeId = dd.getStorageID();
-            blockManager.removeFromInvalidates(datanodeId, last);
+            blockManager.removeFromInvalidates(datanodeId, lastBlock);
           }
         }
       }
@@ -1115,7 +1154,8 @@
    * client to "try again later".
    */
   public LocatedBlock getAdditionalBlock(String src, 
-                                         String clientName
+                                         String clientName,
+                                         Block previous
                                          ) throws IOException {
     long fileLength, blockSize;
     int replication;
@@ -1135,6 +1175,9 @@
 
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
 
+      // commit the last block
+      blockManager.commitLastBlock(pendingFile, previous);
+
       //
       // If we fail this, bad things happen!
       //
@@ -1168,9 +1211,11 @@
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
+      // complete the penultimate block
+      blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
+
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pathINodes);
-      pendingFile.setTargets(targets);
+      newBlock = allocateBlock(src, pathINodes, targets);
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
@@ -1250,15 +1295,18 @@
     COMPLETE_SUCCESS
   }
   
-  public CompleteFileStatus completeFile(String src, String holder) throws IOException {
-    CompleteFileStatus status = completeFileInternal(src, holder);
+  public CompleteFileStatus completeFile(String src,
+                                         String holder,
+                                         Block last) throws IOException {
+    CompleteFileStatus status = completeFileInternal(src, holder, last);
     getEditLog().logSync();
     return status;
   }
 
-
-  private synchronized CompleteFileStatus completeFileInternal(String src, 
-                                                String holder) throws IOException {
+  private synchronized CompleteFileStatus completeFileInternal(
+                                            String src, 
+                                            String holder,
+                                            Block last) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
     if (isInSafeMode())
       throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1279,7 +1327,12 @@
                                      ("from " + pendingFile.getClientMachine()))
                                   );                      
       return CompleteFileStatus.OPERATION_FAILED;
-    } else if (!checkFileProgress(pendingFile, true)) {
+    } 
+
+    // commit the last block
+    blockManager.commitLastBlock(pendingFile, last);
+
+    if (!checkFileProgress(pendingFile, true)) {
       return CompleteFileStatus.STILL_WAITING;
     }
 
@@ -1312,13 +1365,15 @@
    * @param inodes INode representing each of the components of src. 
    *        <code>inodes[inodes.length-1]</code> is the INode for the file.
    */
-  private Block allocateBlock(String src, INode[] inodes) throws IOException {
+  private Block allocateBlock(String src,
+                              INode[] inodes,
+                              DatanodeDescriptor targets[]) throws IOException {
     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(FSNamesystem.randBlockId.nextLong());
     }
     b.setGenerationStamp(getGenerationStamp());
-    b = dir.addBlock(src, inodes, b);
+    b = dir.addBlock(src, inodes, b, targets);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
                                  +src+ ". "+b);
     return b;
@@ -1329,12 +1384,12 @@
    * replicated.  If not, return false. If checkall is true, then check
    * all blocks, otherwise check only penultimate block.
    */
-  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
     if (checkall) {
       //
       // check all blocks of the file.
       //
-      for (Block block: v.getBlocks()) {
+      for (BlockInfo block: v.getBlocks()) {
         if (!blockManager.checkMinReplication(block)) {
           return false;
         }
@@ -1343,7 +1398,7 @@
       //
       // check the penultimate block of this file
       //
-      Block b = v.getPenultimateBlock();
+      BlockInfo b = v.getPenultimateBlock();
       if (b != null && !blockManager.checkMinReplication(b)) {
         return false;
       }
@@ -1614,20 +1669,31 @@
    * Move a file that is being written to be immutable.
    * @param src The filename
    * @param lease The lease for the client creating the file
-   */
-  void internalReleaseLease(Lease lease, String src) throws IOException {
+   * @param recoveryLeaseHolder reassign lease to this holder if the last block
+   *        needs recovery; keep current holder if null.
+   * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+   *         replication;<br>
+   *         RecoveryInProgressException if lease recovery is in progress.<br>
+   *         IOException in case of an error.
+   * @return true  if file has been successfully finalized and closed or 
+   *         false if block recovery has been initiated
+   */
+  boolean internalReleaseLease(
+      Lease lease, String src, String recoveryLeaseHolder)
+  throws AlreadyBeingCreatedException,
+         IOException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " file does not exist.";
       NameNode.stateChangeLog.warn(message);
       throw new IOException(message);
     }
     if (!iFile.isUnderConstruction()) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " but file is already closed.";
       NameNode.stateChangeLog.warn(message);
@@ -1635,39 +1701,123 @@
     }
 
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+    int nrBlocks = pendingFile.numBlocks();
+    BlockInfo[] blocks = pendingFile.getBlocks();
 
-    // Initialize lease recovery for pendingFile. If there are no blocks 
-    // associated with this file, then reap lease immediately. Otherwise 
-    // renew the lease and trigger lease recovery.
-    if (pendingFile.getTargets() == null ||
-        pendingFile.getTargets().length == 0) {
-      if (pendingFile.getBlocks().length == 0) {
+    int nrCompleteBlocks;
+    BlockInfo curBlock = null;
+    for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+      curBlock = blocks[nrCompleteBlocks];
+      if(!curBlock.isComplete())
+        break;
+      assert blockManager.checkMinReplication(curBlock) :
+              "A COMPLETE block is not minimally replicated in " + src;
+    }
+
+    // If there are no incomplete blocks associated with this file,
+    // then reap lease immediately and close the file.
+    if(nrCompleteBlocks == nrBlocks) {
+      finalizeINodeFileUnderConstruction(src, pendingFile);
+      NameNode.stateChangeLog.warn("BLOCK*"
+        + " internalReleaseLease: All existing blocks are COMPLETE,"
+        + " lease removed, file closed.");
+      return true;  // closed!
+    }
+
+    // Only the last and the penultimate blocks may be in non COMPLETE state.
+    // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+    if(nrCompleteBlocks < nrBlocks - 2 ||
+       nrCompleteBlocks == nrBlocks - 2 &&
+         curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+      final String message = "DIR* NameSystem.internalReleaseLease: "
+        + "attempt to release a create lock on "
+        + src + " but file is already closed.";
+      NameNode.stateChangeLog.warn(message);
+      throw new IOException(message);
+    }
+
+    // no we know that the last block is not COMPLETE, and
+    // that the penultimate block if exists is either COMPLETE or COMMITTED
+    BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+    BlockUCState lastBlockState = lastBlock.getBlockUCState();
+    BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+    BlockUCState penultimateBlockState = (penultimateBlock == null ?
+        BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+    assert penultimateBlockState == BlockUCState.COMPLETE ||
+           penultimateBlockState == BlockUCState.COMMITTED :
+           "Unexpected state of penultimate block in " + src;
+
+    switch(lastBlockState) {
+    case COMPLETE:
+      assert false : "Already checked that the last block is incomplete";
+      break;
+    case COMMITTED:
+      // Close file if committed blocks are minimally replicated
+      if(blockManager.checkMinReplication(penultimateBlock) &&
+          blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
-          + " internalReleaseLease: No blocks found, lease removed.");
-        return;
-      }
-      // setup the Inode.targets for the last block from the blockManager
-      //
-      BlockInfo[] blocks = pendingFile.getBlocks();
-      BlockInfo last = blocks[blocks.length-1];
-      DatanodeDescriptor[] targets = blockManager.getNodes(last);
-      pendingFile.setTargets(targets);
+          + " internalReleaseLease: Committed blocks are minimally replicated,"
+          + " lease removed, file closed.");
+        return true;  // closed!
+      }
+      // Cannot close file right now, since some blocks 
+      // are not yet minimally replicated.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      String message = "DIR* NameSystem.internalReleaseLease: " +
+          "Failed to release lease for file " + src +
+          ". Committed blocks are waiting to be minimally replicated." +
+          " Try again later.";
+      NameNode.stateChangeLog.warn(message);
+      throw new AlreadyBeingCreatedException(message);
+    case UNDER_CONSTRUCTION:
+    case UNDER_RECOVERY:
+      // setup the last block locations from the blockManager if not known
+      if(lastBlock.getNumExpectedLocations() == 0)
+        lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+      // start recovery of the last block for this file
+      long blockRecoveryId = nextGenerationStamp();
+      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+      lastBlock.initializeBlockRecovery(blockRecoveryId);
+      leaseManager.renewLease(lease);
+      // Cannot close file right now, since the last block requires recovery.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      NameNode.stateChangeLog.warn(
+                "DIR* NameSystem.internalReleaseLease: " +
+                "File " + src + " has not been closed." +
+               " Lease recovery is in progress. " +
+                "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+      break;
     }
-    // start lease recovery of the last block for this file.
-    pendingFile.assignPrimaryDatanode();
-    leaseManager.renewLease(lease);
+    return false;
+  }
+
+  Lease reassignLease(Lease lease, String src, String newHolder,
+                      INodeFileUnderConstruction pendingFile) {
+    if(newHolder == null)
+      return lease;
+    pendingFile.setClientName(newHolder);
+    return leaseManager.reassignLease(lease, src, newHolder);
   }
 
-  private void finalizeINodeFileUnderConstruction(String src,
+
+  private void finalizeINodeFileUnderConstruction(
+      String src,
       INodeFileUnderConstruction pendingFile) throws IOException {
-    leaseManager.removeLease(pendingFile.clientName, src);
+    leaseManager.removeLease(pendingFile.getClientName(), src);
+
+    // complete the penultimate block
+    blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
 
     // The file is no longer pending.
-    // Create permanent INode, update blockmap
+    // Create permanent INode, update blocks
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
+    // complete last block of the file
+    blockManager.completeBlock(newFile, newFile.numBlocks()-1);
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
@@ -1690,11 +1840,20 @@
       throw new IOException("Block (=" + lastblock + ") not found");
     }
     INodeFile iFile = oldblockinfo.getINode();
-    if (!iFile.isUnderConstruction()) {
+    if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
       throw new IOException("Unexpected block (=" + lastblock
           + ") since the file (=" + iFile.getLocalName()
           + ") is not under construction");
     }
+
+    long recoveryId =
+      ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
+    if(recoveryId != newgenerationstamp) {
+      throw new IOException("The recovery id " + newgenerationstamp
+          + " does not match current recovery id "
+          + recoveryId + " for block " + lastblock); 
+    }
+        
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
 
@@ -1703,12 +1862,15 @@
     blockManager.removeBlockFromMap(oldblockinfo);
 
     if (deleteblock) {
-      pendingFile.removeBlock(lastblock);
+      pendingFile.removeLastBlock(lastblock);
     }
     else {
       // update last block, construct newblockinfo and add it to the blocks map
       lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
-      final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
+      BlockInfoUnderConstruction newblockinfo = 
+        new BlockInfoUnderConstruction(
+            lastblock, pendingFile.getReplication());
+      blockManager.addINode(newblockinfo, pendingFile);
 
       // find the DatanodeDescriptor objects
       // There should be no locations in the blockManager till now because the
@@ -1727,11 +1889,9 @@
         for (int i = 0; i < descriptors.length; i++) {
           descriptors[i].addBlock(newblockinfo);
         }
-        pendingFile.setLastBlock(newblockinfo, null);
-      } else {
-        // add locations into the INodeUnderConstruction
-        pendingFile.setLastBlock(newblockinfo, descriptors);
       }
+      // add locations into the INodeUnderConstruction
+      pendingFile.setLastBlock(newblockinfo, descriptors);
     }
 
     // If this commit does not want to close the file, persist
@@ -1745,7 +1905,10 @@
       LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
       return;
     }
-    
+
+    // commit the last block
+    blockManager.commitLastBlock(pendingFile, lastblock);
+
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile);
     getEditLog().logSync();
@@ -3342,7 +3505,7 @@
   void setBlockTotal() {
     if (safeMode == null)
       return;
-    safeMode.setBlockTotal((int)getBlocksTotal());
+    safeMode.setBlockTotal((int)getCompleteBlocksTotal());
   }
 
   /**
@@ -3353,6 +3516,33 @@
   }
 
   /**
+   * Get the total number of COMPLETE blocks in the system.
+   * For safe mode only complete blocks are counted.
+   */
+  long getCompleteBlocksTotal() {
+    // Calculate number of blocks under construction
+    long numUCBlocks = 0;
+    for (Lease lease : leaseManager.getSortedLeases()) {
+      for(String path : lease.getPaths()) {
+        INode node = dir.getFileINode(path);
+        assert node != null : "Found a lease for nonexisting file.";
+        assert node.isUnderConstruction() :
+          "Found a lease for file that is not under construction.";
+        INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+        BlockInfo[] blocks = cons.getBlocks();
+        if(blocks == null)
+          continue;
+        for(BlockInfo b : blocks) {
+          if(!b.isComplete())
+            numUCBlocks++;
+        }
+      }
+    }
+    LOG.info("Number of blocks under construction: " + numUCBlocks);
+    return getBlocksTotal() - numUCBlocks;
+  }
+
+  /**
    * Enter safe mode manually.
    * @throws IOException
    */
@@ -3671,29 +3861,129 @@
     return gs;
   }
 
+  private INodeFileUnderConstruction checkUCBlock(Block block, String clientName) 
+  throws IOException {
+    // check safe mode
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot get a new generation stamp and an " +
+                                "access token for block " + block, safeMode);
+    
+    // check stored block state
+    BlockInfo storedBlock = blockManager.getStoredBlock(block);
+    if (storedBlock == null || 
+        storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
+        throw new IOException(block + 
+            " does not exist or is not under Construction" + storedBlock);
+    }
+    
+    // check file inode
+    INodeFile file = storedBlock.getINode();
+    if (file==null || !file.isUnderConstruction()) {
+      throw new IOException("The file " + storedBlock + 
+          " is belonged to does not exist or it is not under construction.");
+    }
+    
+    // check lease
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+    if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
+      throw new LeaseExpiredException("Lease mismatch: " + block + 
+          " is accessed by a non lease holder " + clientName); 
+    }
+
+    return pendingFile;
+  }
+  
   /**
-   * Verifies that the block is associated with a file that has a lease.
-   * Increments, logs and then returns the stamp
+   * Get a new generation stamp together with an access token for 
+   * a block under construction
+   * 
+   * This method is called for recovering a failed pipeline or setting up
+   * a pipeline to append to a block.
+   * 
+   * @param block a block
+   * @param clientName the name of a client
+   * @return a located block with a new generation stamp and an access token
+   * @throws IOException if any error occurs
+   */
+  synchronized LocatedBlock updateBlockForPipeline(Block block, 
+      String clientName) throws IOException {
+    // check vadility of parameters
+    checkUCBlock(block, clientName);
+    
+    // get a new generation stamp and an access token
+    block.setGenerationStamp(nextGenerationStamp());
+    LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+    if (isAccessTokenEnabled) {
+      locatedBlock.setAccessToken(accessTokenHandler.generateToken(
+          block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    }
+    return locatedBlock;
+  }
+  
+  
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldblock and old block
+   * @param newBlock a new block with a new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
    */
-  synchronized long nextGenerationStampForBlock(Block block) throws IOException {
-    BlockInfo storedBlock = blockManager.getStoredBlock(block);
-    if (storedBlock == null) {
-      String msg = block + " is already commited, storedBlock == null.";
-      LOG.info(msg);
+  synchronized void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    LOG.info("updatePipeline(block=" + oldBlock
+        + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+        + ", newLength=" + newBlock.getNumBytes()
+        + ", newNodes=" + Arrays.asList(newNodes)
+        + ", clientName=" + clientName
+        + ")");
+
+    // check the vadility of the block and lease holder name
+    final INodeFileUnderConstruction pendingFile = 
+      checkUCBlock(oldBlock, clientName);
+    final BlockInfo oldblockinfo = pendingFile.getLastBlock();
+
+    // check new GS & length: this is not expected
+    if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
+        newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
+      String msg = "Update " + oldBlock + " (len = " + 
+      oldblockinfo.getNumBytes() + ") to an older state: " + newBlock + 
+      " (len = " + newBlock.getNumBytes() +")";
+      LOG.warn(msg);
       throw new IOException(msg);
     }
-    INodeFile fileINode = storedBlock.getINode();
-    if (!fileINode.isUnderConstruction()) {
-      String msg = block + " is already commited, !fileINode.isUnderConstruction().";
-      LOG.info(msg);
-      throw new IOException(msg);
+    
+    // Remove old block from blocks map. This always have to be done
+    // because the generation stamp of this block is changing.
+    blockManager.removeBlockFromMap(oldblockinfo);
+
+    // update last block, construct newblockinfo and add it to the blocks map
+    BlockInfoUnderConstruction newblockinfo = 
+      new BlockInfoUnderConstruction(
+          newBlock, pendingFile.getReplication());
+    blockManager.addINode(newblockinfo, pendingFile);
+
+    // find the DatanodeDescriptor objects
+    DatanodeDescriptor[] descriptors = null;
+    if (newNodes.length > 0) {
+      descriptors = new DatanodeDescriptor[newNodes.length];
+      for(int i = 0; i < newNodes.length; i++) {
+        descriptors[i] = getDatanode(newNodes[i]);
+      }
     }
-    if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
-      String msg = block + " is beening recovered, ignoring this request.";
-      LOG.info(msg);
-      throw new IOException(msg);
+    // add locations into the INodeUnderConstruction
+    pendingFile.setLastBlock(newblockinfo, descriptors);
+
+    // persist blocks only if append is supported
+    String src = leaseManager.findPath(pendingFile);
+    if (supportAppends) {
+      dir.persistBlocks(src, pendingFile);
+      getEditLog().logSync();
     }
-    return nextGenerationStamp();
+    LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+    return;
   }
 
   // rename was successful. If any part of the renamed subtree had

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Sep 30 22:57:30 2009
@@ -25,8 +25,6 @@
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
  * We keep an in-memory representation of the file/block hierarchy.
@@ -423,10 +421,4 @@
     }
     return null;
   }
-  
-  
-  LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
-    return new LocatedBlocks(computeContentSummary().getLength(), blocks,
-        isUnderConstruction());
-  }
 }

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Sep 30 22:57:30 2009
@@ -122,16 +122,29 @@
 
   /** {@inheritDoc} */
   long[] computeContentSummary(long[] summary) {
-    long bytes = 0;
-    for(Block blk : blocks) {
-      bytes += blk.getNumBytes();
-    }
-    summary[0] += bytes;
+    summary[0] += computeFileSize(true);
     summary[1]++;
     summary[3] += diskspaceConsumed();
     return summary;
   }
 
+  /** Compute file size.
+   * May or may not include BlockInfoUnderConstruction.
+   */
+  long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+    if (blocks == null || blocks.length == 0) {
+      return 0;
+    }
+    final int last = blocks.length - 1;
+    //check if the last block is BlockInfoUnderConstruction
+    long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+                 && !includesBlockInfoUnderConstruction?
+                     0: blocks[last].getNumBytes();
+    for(int i = 0; i < last; i++) {
+      bytes += blocks[i].getNumBytes();
+    }
+    return bytes;
+  }
   
 
   @Override
@@ -173,13 +186,14 @@
   /**
    * Return the penultimate allocated block for this file.
    */
-  Block getPenultimateBlock() {
+  BlockInfo getPenultimateBlock() {
     if (blocks == null || blocks.length <= 1) {
       return null;
     }
     return blocks[blocks.length - 2];
   }
 
+  // SHV !!! this is not used anywhere - remove
   INodeFileUnderConstruction toINodeFileUnderConstruction(
       String clientName, String clientMachine, DatanodeDescriptor clientNode
       ) throws IOException {
@@ -191,4 +205,27 @@
         blocks, getPermissionStatus(),
         clientName, clientMachine, clientNode);
   }
+
+  /**
+   * Get the last block of the file.
+   * Make sure it has the right type.
+   */
+  <T extends BlockInfo> T getLastBlock() throws IOException {
+    if (blocks == null || blocks.length == 0)
+      return null;
+    T returnBlock = null;
+    try {
+      @SuppressWarnings("unchecked")  // ClassCastException is caught below
+      T tBlock = (T)blocks[blocks.length - 1];
+      returnBlock = tBlock;
+    } catch(ClassCastException cce) {
+      throw new IOException("Unexpected last block type: " 
+          + blocks[blocks.length - 1].getClass().getSimpleName());
+    }
+    return returnBlock;
+  }
+
+  int numBlocks() {
+    return blocks == null ? 0 : blocks.length;
+  }
 }

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Wed Sep 30 22:57:30 2009
@@ -21,16 +21,13 @@
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 
 
 class INodeFileUnderConstruction extends INodeFile {
-  final String clientName;         // lease holder
+  private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
-
-  private int primaryNodeIndex = -1; //the node working on lease recovery
-  private DatanodeDescriptor[] targets = null;   //locations for last block
-  private long lastRecoveryTime = 0;
   
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
@@ -67,6 +64,10 @@
     return clientName;
   }
 
+  void setClientName(String clientName) {
+    this.clientName = clientName;
+  }
+
   String getClientMachine() {
     return clientMachine;
   }
@@ -83,15 +84,6 @@
     return true;
   }
 
-  DatanodeDescriptor[] getTargets() {
-    return targets;
-  }
-
-  void setTargets(DatanodeDescriptor[] targets) {
-    this.targets = targets;
-    this.primaryNodeIndex = -1;
-  }
-
   //
   // converts a INodeFileUnderConstruction into a INodeFile
   // use the modification time as the access time
@@ -108,10 +100,10 @@
   }
 
   /**
-   * remove a block from the block list. This block should be
+   * Remove a block from the block list. This block should be
    * the last one on the list.
    */
-  void removeBlock(Block oldblock) throws IOException {
+  void removeLastBlock(Block oldblock) throws IOException {
     if (blocks == null) {
       throw new IOException("Trying to delete non-existant block " + oldblock);
     }
@@ -124,57 +116,24 @@
     BlockInfo[] newlist = new BlockInfo[size_1];
     System.arraycopy(blocks, 0, newlist, 0, size_1);
     blocks = newlist;
-    
-    // Remove the block locations for the last block.
-    targets = null;
-  }
-
-  synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
-      ) throws IOException {
-    if (blocks == null) {
-      throw new IOException("Trying to update non-existant block (newblock="
-          + newblock + ")");
-    }
-    blocks[blocks.length - 1] = newblock;
-    setTargets(newtargets);
-    lastRecoveryTime = 0;
   }
 
   /**
-   * Initialize lease recovery for this object
+   * Convert the last block of the file to an under-construction block.
+   * Set its locations.
    */
-  void assignPrimaryDatanode() {
-    //assign the first alive datanode as the primary datanode
-
-    if (targets.length == 0) {
-      NameNode.stateChangeLog.warn("BLOCK*"
-        + " INodeFileUnderConstruction.initLeaseRecovery:"
-        + " No blocks found, lease removed.");
-    }
-
-    int previous = primaryNodeIndex;
-    //find an alive datanode beginning from previous
-    for(int i = 1; i <= targets.length; i++) {
-      int j = (previous + i)%targets.length;
-      if (targets[j].isAlive) {
-        DatanodeDescriptor primary = targets[primaryNodeIndex = j]; 
-        primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
-        NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
-          + " recovery started, primary=" + primary);
-        return;
-      }
-    }
-  }
-  
-  /**
-   * Update lastRecoveryTime if expired.
-   * @return true if lastRecoveryTimeis updated. 
-   */
-  synchronized boolean setLastRecoveryTime(long now) {
-    boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD;
-    if (expired) {
-      lastRecoveryTime = now;
-    }
-    return expired;
+  BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+                                          DatanodeDescriptor[] targets)
+  throws IOException {
+    if (blocks == null || blocks.length == 0) {
+      throw new IOException("Trying to update non-existant block. " +
+      		"File is empty.");
+    }
+    BlockInfoUnderConstruction ucBlock =
+      lastBlock.convertToBlockUnderConstruction(
+          BlockUCState.UNDER_CONSTRUCTION, targets);
+    ucBlock.setINode(this);
+    setBlock(numBlocks()-1, ucBlock);
+    return ucBlock;
   }
 }

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Wed Sep 30 22:57:30 2009
@@ -102,7 +102,7 @@
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String holder, String src) {
+  synchronized Lease addLease(String holder, String src) {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
@@ -113,6 +113,7 @@
     }
     sortedLeasesByPath.put(src, lease);
     lease.paths.add(src);
+    return lease;
   }
 
   /**
@@ -143,11 +144,22 @@
   }
 
   /**
+   * Reassign lease for file src to the new holder.
+   */
+  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+    assert newHolder != null : "new lease holder is null";
+    if (lease != null) {
+      removeLease(lease, src);
+    }
+    return addLease(newHolder, src);
+  }
+
+  /**
    * Finds the pathname for the specified pendingFile
    */
   synchronized String findPath(INodeFileUnderConstruction pendingFile
       ) throws IOException {
-    Lease lease = getLease(pendingFile.clientName);
+    Lease lease = getLease(pendingFile.getClientName());
     if (lease != null) {
       String src = lease.findPath(pendingFile);
       if (src != null) {
@@ -265,7 +277,11 @@
     Collection<String> getPaths() {
       return paths;
     }
-    
+
+    String getHolder() {
+      return holder;
+    }
+
     void replacePath(String oldpath, String newpath) {
       paths.remove(oldpath);
       paths.add(newpath);
@@ -376,7 +392,13 @@
       oldest.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
         try {
-          fsnamesystem.internalReleaseLease(oldest, p);
+          if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+            LOG.info("Lease recovery for file " + p +
+                          " is complete. File closed.");
+            removing.add(p);
+          } else
+            LOG.info("Started block recovery for file " + p +
+                          " lease " + oldest);
         } catch (IOException e) {
           LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
           removing.add(p);

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Sep 30 22:57:30 2009
@@ -610,11 +610,12 @@
 
   /**
    */
-  public LocatedBlock addBlock(String src, 
-                               String clientName) throws IOException {
+  public LocatedBlock addBlock(String src, String clientName,
+                               Block previous) throws IOException {
     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
                          +src+" for "+clientName);
-    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
+    LocatedBlock locatedBlock = 
+      namesystem.getAdditionalBlock(src, clientName, previous);
     if (locatedBlock != null)
       myMetrics.numAddBlockOps.inc();
     return locatedBlock;
@@ -633,9 +634,11 @@
   }
 
   /** {@inheritDoc} */
-  public boolean complete(String src, String clientName) throws IOException {
+  public boolean complete(String src, String clientName,
+                          Block last) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
-    CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
+    CompleteFileStatus returnCode =
+      namesystem.completeFile(src, clientName, last);
     if (returnCode == CompleteFileStatus.STILL_WAITING) {
       return false;
     } else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
@@ -664,10 +667,20 @@
   }
 
   /** {@inheritDoc} */
-  public long nextGenerationStamp(Block block) throws IOException{
-    return namesystem.nextGenerationStampForBlock(block);
+  @Override
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+  throws IOException {
+    return namesystem.updateBlockForPipeline(block, clientName);
   }
 
+
+  @Override
+  public void updatePipeline(String clientName, Block oldBlock,
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+  }
+  
   /** {@inheritDoc} */
   public void commitBlockSynchronization(Block block,
       long newgenerationstamp, long newlength,

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Sep 30 22:57:30 2009
@@ -35,10 +35,9 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 20: SendHeartbeat may return KeyUpdateCommand
-   *     Register returns access keys inside DatanodeRegistration object
+   * 23: nextGenerationStamp() removed.
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 23L;
   
   // error code
   final static int NOTIFY = 0;
@@ -143,12 +142,6 @@
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
   
   /**
-   * @return the next GenerationStamp to be associated with the specified
-   * block. 
-   */
-  public long nextGenerationStamp(Block block) throws IOException;
-
-  /**
    * Commit block synchronization in lease recovery
    */
   public void commitBlockSynchronization(Block block,

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Wed Sep 30 22:57:30 2009
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 /** An inter-datanode protocol for updating generation stamp
@@ -31,17 +32,36 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 3: added a finalize parameter to updateBlock
+   * 4: initReplicaRecovery(), updateReplicaUnderRecovery() added.
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   /** @return the BlockMetaDataInfo of a block;
    *  null if the block is not found 
    */
+  @Deprecated
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
    * Update the block to the new generation stamp and length.  
    */
-  void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
+  @Deprecated
+  void updateBlock(Block oldblock, Block newblock, boolean finalize)
+  throws IOException;
+
+  /**
+   * Initialize a replica recovery.
+   * 
+   * @return actual state of the replica on this data-node or 
+   * null if data-node does not have the replica.
+   */
+  ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException;
+
+  /**
+   * Update replica with the new generation stamp and length.  
+   */
+  Block updateReplicaUnderRecovery(Block oldBlock,
+                                   long recoveryId,
+                                   long newLength) throws IOException;
 }