You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 01:39:33 UTC

svn commit: r820497 [5/7] - in /hadoop/hdfs/trunk: ./ .eclipse.templates/.launches/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apach...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 30 23:39:30 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -563,13 +564,18 @@
     }
     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
     long totalSize = 0;
+    BlockInfo curBlock;
     while(totalSize<size && iter.hasNext()) {
-      totalSize += addBlock(iter.next(), results);
+      curBlock = iter.next();
+      if(!curBlock.isComplete())  continue;
+      totalSize += addBlock(curBlock, results);
     }
     if(totalSize<size) {
       iter = node.getBlockIterator(); // start from the beginning
       for(int i=0; i<startBlock&&totalSize<size; i++) {
-        totalSize += addBlock(iter.next(), results);
+        curBlock = iter.next();
+        if(!curBlock.isComplete())  continue;
+        totalSize += addBlock(curBlock, results);
       }
     }
 
@@ -707,14 +713,46 @@
     if (doAccessTime && isAccessTimeSupported()) {
       dir.setTimes(src, inode, -1, now(), false);
     }
-    final Block[] blocks = inode.getBlocks();
+    final BlockInfo[] blocks = inode.getBlocks();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+    }
     if (blocks == null) {
       return null;
     }
-    final List<LocatedBlock> results = blocks.length == 0?
-        new ArrayList<LocatedBlock>(0):
-        blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE);
-    return inode.createLocatedBlocks(results);
+
+    if (blocks.length == 0) {
+      return new LocatedBlocks(0, inode.isUnderConstruction(),
+          Collections.<LocatedBlock>emptyList(), null, false);
+    } else {
+      final long n = inode.computeFileSize(false);
+      final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+          blocks, offset, length, Integer.MAX_VALUE);
+      final BlockInfo last = inode.getLastBlock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("last = " + last);
+      }
+
+      if (!last.isComplete()) {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n), false);
+      }
+      else {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+      }
+    }
+  }
+
+  /** Create a LocatedBlock. */
+  LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+      final long offset, final boolean corrupt) throws IOException {
+    final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+    if (isAccessTokenEnabled) {
+      lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+          EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+    }
+    return lb;
   }
 
   /**
@@ -912,40 +950,45 @@
         // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
-        Lease lease = leaseManager.getLease(holder);
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease != null) {
+        Lease lease = leaseManager.getLeaseByPath(src);
+        if (lease == null) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because current leaseholder is trying to recreate file.");
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because pendingCreates is non-null but no leases found.");
         }
         //
-        // Find the original holder.
+        // We found the lease for this file. And surprisingly the original
+        // holder is trying to recreate this file. This should never occur.
         //
-        lease = leaseManager.getLease(pendingFile.clientName);
-        if (lease == null) {
+        if (lease.getHolder().equals(holder)) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because pendingCreates is non-null but no leases found.");
-        }
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because current leaseholder is trying to recreate file.");
+        }
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
         //
+        // Current lease holder is different from the requester.
         // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery.
+        // period, then start lease recovery, otherwise fail.
         //
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          internalReleaseLease(lease, src);
-        }
-        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
-                                               " on client " + clientMachine + 
-                                               ", because this file is already being created by " +
-                                               pendingFile.getClientName() + 
-                                               " on " + pendingFile.getClientMachine());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+
+        } else
+          throw new AlreadyBeingCreatedException("failed to create file " +
+              src + " for " + holder + " on client " + clientMachine + 
+              ", because this file is already being created by " +
+              pendingFile.getClientName() + 
+              " on " + pendingFile.getClientMachine());
       }
 
       try {
@@ -998,7 +1041,7 @@
                                         clientMachine,
                                         clientNode);
         dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.clientName, src);
+        leaseManager.addLease(cons.getClientName(), src);
 
       } else {
        // Now we can add the name to the filesystem. This file has no
@@ -1014,7 +1057,7 @@
           throw new IOException("DIR* NameSystem.startFile: " +
                                 "Unable to add file to namespace.");
         }
-        leaseManager.addLease(newNode.clientName, src);
+        leaseManager.addLease(newNode.getClientName(), src);
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                      +"add "+src+" to namespace for "+holder);
@@ -1048,40 +1091,36 @@
     LocatedBlock lb = null;
     synchronized (this) {
       INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-
-      BlockInfo[] blocks = file.getBlocks();
-      if (blocks != null && blocks.length > 0) {
-        BlockInfo last = blocks[blocks.length-1];
-        // this is a redundant search in blocksMap
-        // should be resolved by the new implementation of append
-        BlockInfo storedBlock = blockManager.getStoredBlock(last);
-        assert last == storedBlock : "last block should be in the blocksMap";
-        if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+      BlockInfo lastBlock = file.getLastBlock();
+      if (lastBlock != null) {
+        assert lastBlock == blockManager.getStoredBlock(lastBlock) :
+          "last block of the file is not in blocksMap";
+        if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
           long fileLength = file.computeContentSummary().getLength();
-          DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
+          DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
           // remove the replica locations of this block from the node
           for (int i = 0; i < targets.length; i++) {
-            targets[i].removeBlock(storedBlock);
+            targets[i].removeBlock(lastBlock);
           }
-          // set the locations of the last block in the lease record
-          file.setLastBlock(storedBlock, targets);
+          // convert last block to under-construction and set its locations
+          blockManager.convertLastBlockToUnderConstruction(file, targets);
 
-          lb = new LocatedBlock(last, targets, 
-                                fileLength-storedBlock.getNumBytes());
+          lb = new LocatedBlock(lastBlock, targets, 
+                                fileLength-lastBlock.getNumBytes());
           if (isAccessTokenEnabled) {
             lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
                 .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
           }
 
           // Remove block from replication queue.
-          blockManager.updateNeededReplications(last, 0, 0);
+          blockManager.updateNeededReplications(lastBlock, 0, 0);
 
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
           for (DatanodeDescriptor dd : targets) {
             String datanodeId = dd.getStorageID();
-            blockManager.removeFromInvalidates(datanodeId, last);
+            blockManager.removeFromInvalidates(datanodeId, lastBlock);
           }
         }
       }
@@ -1115,7 +1154,8 @@
    * client to "try again later".
    */
   public LocatedBlock getAdditionalBlock(String src, 
-                                         String clientName
+                                         String clientName,
+                                         Block previous
                                          ) throws IOException {
     long fileLength, blockSize;
     int replication;
@@ -1135,6 +1175,9 @@
 
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
 
+      // commit the last block
+      blockManager.commitLastBlock(pendingFile, previous);
+
       //
       // If we fail this, bad things happen!
       //
@@ -1168,9 +1211,11 @@
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
+      // complete the penultimate block
+      blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
+
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pathINodes);
-      pendingFile.setTargets(targets);
+      newBlock = allocateBlock(src, pathINodes, targets);
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
@@ -1250,15 +1295,18 @@
     COMPLETE_SUCCESS
   }
   
-  public CompleteFileStatus completeFile(String src, String holder) throws IOException {
-    CompleteFileStatus status = completeFileInternal(src, holder);
+  public CompleteFileStatus completeFile(String src,
+                                         String holder,
+                                         Block last) throws IOException {
+    CompleteFileStatus status = completeFileInternal(src, holder, last);
     getEditLog().logSync();
     return status;
   }
 
-
-  private synchronized CompleteFileStatus completeFileInternal(String src, 
-                                                String holder) throws IOException {
+  private synchronized CompleteFileStatus completeFileInternal(
+                                            String src, 
+                                            String holder,
+                                            Block last) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
     if (isInSafeMode())
       throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1279,7 +1327,12 @@
                                      ("from " + pendingFile.getClientMachine()))
                                   );                      
       return CompleteFileStatus.OPERATION_FAILED;
-    } else if (!checkFileProgress(pendingFile, true)) {
+    } 
+
+    // commit the last block
+    blockManager.commitLastBlock(pendingFile, last);
+
+    if (!checkFileProgress(pendingFile, true)) {
       return CompleteFileStatus.STILL_WAITING;
     }
 
@@ -1312,13 +1365,15 @@
    * @param inodes INode representing each of the components of src. 
    *        <code>inodes[inodes.length-1]</code> is the INode for the file.
    */
-  private Block allocateBlock(String src, INode[] inodes) throws IOException {
+  private Block allocateBlock(String src,
+                              INode[] inodes,
+                              DatanodeDescriptor targets[]) throws IOException {
     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(FSNamesystem.randBlockId.nextLong());
     }
     b.setGenerationStamp(getGenerationStamp());
-    b = dir.addBlock(src, inodes, b);
+    b = dir.addBlock(src, inodes, b, targets);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
                                  +src+ ". "+b);
     return b;
@@ -1329,12 +1384,12 @@
    * replicated.  If not, return false. If checkall is true, then check
    * all blocks, otherwise check only penultimate block.
    */
-  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
     if (checkall) {
       //
       // check all blocks of the file.
       //
-      for (Block block: v.getBlocks()) {
+      for (BlockInfo block: v.getBlocks()) {
         if (!blockManager.checkMinReplication(block)) {
           return false;
         }
@@ -1343,7 +1398,7 @@
       //
       // check the penultimate block of this file
       //
-      Block b = v.getPenultimateBlock();
+      BlockInfo b = v.getPenultimateBlock();
       if (b != null && !blockManager.checkMinReplication(b)) {
         return false;
       }
@@ -1614,20 +1669,31 @@
    * Move a file that is being written to be immutable.
    * @param src The filename
    * @param lease The lease for the client creating the file
-   */
-  void internalReleaseLease(Lease lease, String src) throws IOException {
+   * @param recoveryLeaseHolder reassign lease to this holder if the last block
+   *        needs recovery; keep current holder if null.
+   * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+   *         replication;<br>
+   *         RecoveryInProgressException if lease recovery is in progress.<br>
+   *         IOException in case of an error.
+   * @return true  if file has been successfully finalized and closed or 
+   *         false if block recovery has been initiated
+   */
+  boolean internalReleaseLease(
+      Lease lease, String src, String recoveryLeaseHolder)
+  throws AlreadyBeingCreatedException,
+         IOException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " file does not exist.";
       NameNode.stateChangeLog.warn(message);
       throw new IOException(message);
     }
     if (!iFile.isUnderConstruction()) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " but file is already closed.";
       NameNode.stateChangeLog.warn(message);
@@ -1635,39 +1701,123 @@
     }
 
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+    int nrBlocks = pendingFile.numBlocks();
+    BlockInfo[] blocks = pendingFile.getBlocks();
 
-    // Initialize lease recovery for pendingFile. If there are no blocks 
-    // associated with this file, then reap lease immediately. Otherwise 
-    // renew the lease and trigger lease recovery.
-    if (pendingFile.getTargets() == null ||
-        pendingFile.getTargets().length == 0) {
-      if (pendingFile.getBlocks().length == 0) {
+    int nrCompleteBlocks;
+    BlockInfo curBlock = null;
+    for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+      curBlock = blocks[nrCompleteBlocks];
+      if(!curBlock.isComplete())
+        break;
+      assert blockManager.checkMinReplication(curBlock) :
+              "A COMPLETE block is not minimally replicated in " + src;
+    }
+
+    // If there are no incomplete blocks associated with this file,
+    // then reap lease immediately and close the file.
+    if(nrCompleteBlocks == nrBlocks) {
+      finalizeINodeFileUnderConstruction(src, pendingFile);
+      NameNode.stateChangeLog.warn("BLOCK*"
+        + " internalReleaseLease: All existing blocks are COMPLETE,"
+        + " lease removed, file closed.");
+      return true;  // closed!
+    }
+
+    // Only the last and the penultimate blocks may be in non COMPLETE state.
+    // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+    if(nrCompleteBlocks < nrBlocks - 2 ||
+       nrCompleteBlocks == nrBlocks - 2 &&
+         curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+      final String message = "DIR* NameSystem.internalReleaseLease: "
+        + "attempt to release a create lock on "
+        + src + " but file is already closed.";
+      NameNode.stateChangeLog.warn(message);
+      throw new IOException(message);
+    }
+
+    // no we know that the last block is not COMPLETE, and
+    // that the penultimate block if exists is either COMPLETE or COMMITTED
+    BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+    BlockUCState lastBlockState = lastBlock.getBlockUCState();
+    BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+    BlockUCState penultimateBlockState = (penultimateBlock == null ?
+        BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+    assert penultimateBlockState == BlockUCState.COMPLETE ||
+           penultimateBlockState == BlockUCState.COMMITTED :
+           "Unexpected state of penultimate block in " + src;
+
+    switch(lastBlockState) {
+    case COMPLETE:
+      assert false : "Already checked that the last block is incomplete";
+      break;
+    case COMMITTED:
+      // Close file if committed blocks are minimally replicated
+      if(blockManager.checkMinReplication(penultimateBlock) &&
+          blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
-          + " internalReleaseLease: No blocks found, lease removed.");
-        return;
-      }
-      // setup the Inode.targets for the last block from the blockManager
-      //
-      BlockInfo[] blocks = pendingFile.getBlocks();
-      BlockInfo last = blocks[blocks.length-1];
-      DatanodeDescriptor[] targets = blockManager.getNodes(last);
-      pendingFile.setTargets(targets);
+          + " internalReleaseLease: Committed blocks are minimally replicated,"
+          + " lease removed, file closed.");
+        return true;  // closed!
+      }
+      // Cannot close file right now, since some blocks 
+      // are not yet minimally replicated.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      String message = "DIR* NameSystem.internalReleaseLease: " +
+          "Failed to release lease for file " + src +
+          ". Committed blocks are waiting to be minimally replicated." +
+          " Try again later.";
+      NameNode.stateChangeLog.warn(message);
+      throw new AlreadyBeingCreatedException(message);
+    case UNDER_CONSTRUCTION:
+    case UNDER_RECOVERY:
+      // setup the last block locations from the blockManager if not known
+      if(lastBlock.getNumExpectedLocations() == 0)
+        lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+      // start recovery of the last block for this file
+      long blockRecoveryId = nextGenerationStamp();
+      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+      lastBlock.initializeBlockRecovery(blockRecoveryId);
+      leaseManager.renewLease(lease);
+      // Cannot close file right now, since the last block requires recovery.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      NameNode.stateChangeLog.warn(
+                "DIR* NameSystem.internalReleaseLease: " +
+                "File " + src + " has not been closed." +
+               " Lease recovery is in progress. " +
+                "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+      break;
     }
-    // start lease recovery of the last block for this file.
-    pendingFile.assignPrimaryDatanode();
-    leaseManager.renewLease(lease);
+    return false;
+  }
+
+  Lease reassignLease(Lease lease, String src, String newHolder,
+                      INodeFileUnderConstruction pendingFile) {
+    if(newHolder == null)
+      return lease;
+    pendingFile.setClientName(newHolder);
+    return leaseManager.reassignLease(lease, src, newHolder);
   }
 
-  private void finalizeINodeFileUnderConstruction(String src,
+
+  private void finalizeINodeFileUnderConstruction(
+      String src,
       INodeFileUnderConstruction pendingFile) throws IOException {
-    leaseManager.removeLease(pendingFile.clientName, src);
+    leaseManager.removeLease(pendingFile.getClientName(), src);
+
+    // complete the penultimate block
+    blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
 
     // The file is no longer pending.
-    // Create permanent INode, update blockmap
+    // Create permanent INode, update blocks
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
+    // complete last block of the file
+    blockManager.completeBlock(newFile, newFile.numBlocks()-1);
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
@@ -1690,11 +1840,20 @@
       throw new IOException("Block (=" + lastblock + ") not found");
     }
     INodeFile iFile = oldblockinfo.getINode();
-    if (!iFile.isUnderConstruction()) {
+    if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
       throw new IOException("Unexpected block (=" + lastblock
           + ") since the file (=" + iFile.getLocalName()
           + ") is not under construction");
     }
+
+    long recoveryId =
+      ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
+    if(recoveryId != newgenerationstamp) {
+      throw new IOException("The recovery id " + newgenerationstamp
+          + " does not match current recovery id "
+          + recoveryId + " for block " + lastblock); 
+    }
+        
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
 
@@ -1703,12 +1862,15 @@
     blockManager.removeBlockFromMap(oldblockinfo);
 
     if (deleteblock) {
-      pendingFile.removeBlock(lastblock);
+      pendingFile.removeLastBlock(lastblock);
     }
     else {
       // update last block, construct newblockinfo and add it to the blocks map
       lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
-      final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
+      BlockInfoUnderConstruction newblockinfo = 
+        new BlockInfoUnderConstruction(
+            lastblock, pendingFile.getReplication());
+      blockManager.addINode(newblockinfo, pendingFile);
 
       // find the DatanodeDescriptor objects
       // There should be no locations in the blockManager till now because the
@@ -1727,11 +1889,9 @@
         for (int i = 0; i < descriptors.length; i++) {
           descriptors[i].addBlock(newblockinfo);
         }
-        pendingFile.setLastBlock(newblockinfo, null);
-      } else {
-        // add locations into the INodeUnderConstruction
-        pendingFile.setLastBlock(newblockinfo, descriptors);
       }
+      // add locations into the INodeUnderConstruction
+      pendingFile.setLastBlock(newblockinfo, descriptors);
     }
 
     // If this commit does not want to close the file, persist
@@ -1745,7 +1905,10 @@
       LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
       return;
     }
-    
+
+    // commit the last block
+    blockManager.commitLastBlock(pendingFile, lastblock);
+
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile);
     getEditLog().logSync();
@@ -3342,7 +3505,7 @@
   void setBlockTotal() {
     if (safeMode == null)
       return;
-    safeMode.setBlockTotal((int)getBlocksTotal());
+    safeMode.setBlockTotal((int)getCompleteBlocksTotal());
   }
 
   /**
@@ -3353,6 +3516,33 @@
   }
 
   /**
+   * Get the total number of COMPLETE blocks in the system.
+   * For safe mode only complete blocks are counted.
+   */
+  long getCompleteBlocksTotal() {
+    // Calculate number of blocks under construction
+    long numUCBlocks = 0;
+    for (Lease lease : leaseManager.getSortedLeases()) {
+      for(String path : lease.getPaths()) {
+        INode node = dir.getFileINode(path);
+        assert node != null : "Found a lease for nonexisting file.";
+        assert node.isUnderConstruction() :
+          "Found a lease for file that is not under construction.";
+        INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+        BlockInfo[] blocks = cons.getBlocks();
+        if(blocks == null)
+          continue;
+        for(BlockInfo b : blocks) {
+          if(!b.isComplete())
+            numUCBlocks++;
+        }
+      }
+    }
+    LOG.info("Number of blocks under construction: " + numUCBlocks);
+    return getBlocksTotal() - numUCBlocks;
+  }
+
+  /**
    * Enter safe mode manually.
    * @throws IOException
    */
@@ -3671,29 +3861,129 @@
     return gs;
   }
 
+  private INodeFileUnderConstruction checkUCBlock(Block block, String clientName) 
+  throws IOException {
+    // check safe mode
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot get a new generation stamp and an " +
+                                "access token for block " + block, safeMode);
+    
+    // check stored block state
+    BlockInfo storedBlock = blockManager.getStoredBlock(block);
+    if (storedBlock == null || 
+        storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
+        throw new IOException(block + 
+            " does not exist or is not under Construction" + storedBlock);
+    }
+    
+    // check file inode
+    INodeFile file = storedBlock.getINode();
+    if (file==null || !file.isUnderConstruction()) {
+      throw new IOException("The file " + storedBlock + 
+          " is belonged to does not exist or it is not under construction.");
+    }
+    
+    // check lease
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+    if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
+      throw new LeaseExpiredException("Lease mismatch: " + block + 
+          " is accessed by a non lease holder " + clientName); 
+    }
+
+    return pendingFile;
+  }
+  
   /**
-   * Verifies that the block is associated with a file that has a lease.
-   * Increments, logs and then returns the stamp
+   * Get a new generation stamp together with an access token for 
+   * a block under construction
+   * 
+   * This method is called for recovering a failed pipeline or setting up
+   * a pipeline to append to a block.
+   * 
+   * @param block a block
+   * @param clientName the name of a client
+   * @return a located block with a new generation stamp and an access token
+   * @throws IOException if any error occurs
+   */
+  synchronized LocatedBlock updateBlockForPipeline(Block block, 
+      String clientName) throws IOException {
+    // check vadility of parameters
+    checkUCBlock(block, clientName);
+    
+    // get a new generation stamp and an access token
+    block.setGenerationStamp(nextGenerationStamp());
+    LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+    if (isAccessTokenEnabled) {
+      locatedBlock.setAccessToken(accessTokenHandler.generateToken(
+          block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    }
+    return locatedBlock;
+  }
+  
+  
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldblock and old block
+   * @param newBlock a new block with a new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
    */
-  synchronized long nextGenerationStampForBlock(Block block) throws IOException {
-    BlockInfo storedBlock = blockManager.getStoredBlock(block);
-    if (storedBlock == null) {
-      String msg = block + " is already commited, storedBlock == null.";
-      LOG.info(msg);
+  synchronized void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    LOG.info("updatePipeline(block=" + oldBlock
+        + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+        + ", newLength=" + newBlock.getNumBytes()
+        + ", newNodes=" + Arrays.asList(newNodes)
+        + ", clientName=" + clientName
+        + ")");
+
+    // check the vadility of the block and lease holder name
+    final INodeFileUnderConstruction pendingFile = 
+      checkUCBlock(oldBlock, clientName);
+    final BlockInfo oldblockinfo = pendingFile.getLastBlock();
+
+    // check new GS & length: this is not expected
+    if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
+        newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
+      String msg = "Update " + oldBlock + " (len = " + 
+      oldblockinfo.getNumBytes() + ") to an older state: " + newBlock + 
+      " (len = " + newBlock.getNumBytes() +")";
+      LOG.warn(msg);
       throw new IOException(msg);
     }
-    INodeFile fileINode = storedBlock.getINode();
-    if (!fileINode.isUnderConstruction()) {
-      String msg = block + " is already commited, !fileINode.isUnderConstruction().";
-      LOG.info(msg);
-      throw new IOException(msg);
+    
+    // Remove old block from blocks map. This always have to be done
+    // because the generation stamp of this block is changing.
+    blockManager.removeBlockFromMap(oldblockinfo);
+
+    // update last block, construct newblockinfo and add it to the blocks map
+    BlockInfoUnderConstruction newblockinfo = 
+      new BlockInfoUnderConstruction(
+          newBlock, pendingFile.getReplication());
+    blockManager.addINode(newblockinfo, pendingFile);
+
+    // find the DatanodeDescriptor objects
+    DatanodeDescriptor[] descriptors = null;
+    if (newNodes.length > 0) {
+      descriptors = new DatanodeDescriptor[newNodes.length];
+      for(int i = 0; i < newNodes.length; i++) {
+        descriptors[i] = getDatanode(newNodes[i]);
+      }
     }
-    if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
-      String msg = block + " is beening recovered, ignoring this request.";
-      LOG.info(msg);
-      throw new IOException(msg);
+    // add locations into the INodeUnderConstruction
+    pendingFile.setLastBlock(newblockinfo, descriptors);
+
+    // persist blocks only if append is supported
+    String src = leaseManager.findPath(pendingFile);
+    if (supportAppends) {
+      dir.persistBlocks(src, pendingFile);
+      getEditLog().logSync();
     }
-    return nextGenerationStamp();
+    LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+    return;
   }
 
   // rename was successful. If any part of the renamed subtree had

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Sep 30 23:39:30 2009
@@ -25,8 +25,6 @@
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
  * We keep an in-memory representation of the file/block hierarchy.
@@ -423,10 +421,4 @@
     }
     return null;
   }
-  
-  
-  LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
-    return new LocatedBlocks(computeContentSummary().getLength(), blocks,
-        isUnderConstruction());
-  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Sep 30 23:39:30 2009
@@ -122,16 +122,29 @@
 
   /** {@inheritDoc} */
   long[] computeContentSummary(long[] summary) {
-    long bytes = 0;
-    for(Block blk : blocks) {
-      bytes += blk.getNumBytes();
-    }
-    summary[0] += bytes;
+    summary[0] += computeFileSize(true);
     summary[1]++;
     summary[3] += diskspaceConsumed();
     return summary;
   }
 
+  /** Compute file size.
+   * May or may not include BlockInfoUnderConstruction.
+   */
+  long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+    if (blocks == null || blocks.length == 0) {
+      return 0;
+    }
+    final int last = blocks.length - 1;
+    //check if the last block is BlockInfoUnderConstruction
+    long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+                 && !includesBlockInfoUnderConstruction?
+                     0: blocks[last].getNumBytes();
+    for(int i = 0; i < last; i++) {
+      bytes += blocks[i].getNumBytes();
+    }
+    return bytes;
+  }
   
 
   @Override
@@ -173,13 +186,14 @@
   /**
    * Return the penultimate allocated block for this file.
    */
-  Block getPenultimateBlock() {
+  BlockInfo getPenultimateBlock() {
     if (blocks == null || blocks.length <= 1) {
       return null;
     }
     return blocks[blocks.length - 2];
   }
 
+  // SHV !!! this is not used anywhere - remove
   INodeFileUnderConstruction toINodeFileUnderConstruction(
       String clientName, String clientMachine, DatanodeDescriptor clientNode
       ) throws IOException {
@@ -191,4 +205,27 @@
         blocks, getPermissionStatus(),
         clientName, clientMachine, clientNode);
   }
+
+  /**
+   * Get the last block of the file.
+   * Make sure it has the right type.
+   */
+  <T extends BlockInfo> T getLastBlock() throws IOException {
+    if (blocks == null || blocks.length == 0)
+      return null;
+    T returnBlock = null;
+    try {
+      @SuppressWarnings("unchecked")  // ClassCastException is caught below
+      T tBlock = (T)blocks[blocks.length - 1];
+      returnBlock = tBlock;
+    } catch(ClassCastException cce) {
+      throw new IOException("Unexpected last block type: " 
+          + blocks[blocks.length - 1].getClass().getSimpleName());
+    }
+    return returnBlock;
+  }
+
+  int numBlocks() {
+    return blocks == null ? 0 : blocks.length;
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Wed Sep 30 23:39:30 2009
@@ -21,16 +21,13 @@
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 
 
 class INodeFileUnderConstruction extends INodeFile {
-  final String clientName;         // lease holder
+  private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
-
-  private int primaryNodeIndex = -1; //the node working on lease recovery
-  private DatanodeDescriptor[] targets = null;   //locations for last block
-  private long lastRecoveryTime = 0;
   
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
@@ -67,6 +64,10 @@
     return clientName;
   }
 
+  void setClientName(String clientName) {
+    this.clientName = clientName;
+  }
+
   String getClientMachine() {
     return clientMachine;
   }
@@ -83,15 +84,6 @@
     return true;
   }
 
-  DatanodeDescriptor[] getTargets() {
-    return targets;
-  }
-
-  void setTargets(DatanodeDescriptor[] targets) {
-    this.targets = targets;
-    this.primaryNodeIndex = -1;
-  }
-
   //
   // converts a INodeFileUnderConstruction into a INodeFile
   // use the modification time as the access time
@@ -108,10 +100,10 @@
   }
 
   /**
-   * remove a block from the block list. This block should be
+   * Remove a block from the block list. This block should be
    * the last one on the list.
    */
-  void removeBlock(Block oldblock) throws IOException {
+  void removeLastBlock(Block oldblock) throws IOException {
     if (blocks == null) {
       throw new IOException("Trying to delete non-existant block " + oldblock);
     }
@@ -124,57 +116,24 @@
     BlockInfo[] newlist = new BlockInfo[size_1];
     System.arraycopy(blocks, 0, newlist, 0, size_1);
     blocks = newlist;
-    
-    // Remove the block locations for the last block.
-    targets = null;
-  }
-
-  synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
-      ) throws IOException {
-    if (blocks == null) {
-      throw new IOException("Trying to update non-existant block (newblock="
-          + newblock + ")");
-    }
-    blocks[blocks.length - 1] = newblock;
-    setTargets(newtargets);
-    lastRecoveryTime = 0;
   }
 
   /**
-   * Initialize lease recovery for this object
+   * Convert the last block of the file to an under-construction block.
+   * Set its locations.
    */
-  void assignPrimaryDatanode() {
-    //assign the first alive datanode as the primary datanode
-
-    if (targets.length == 0) {
-      NameNode.stateChangeLog.warn("BLOCK*"
-        + " INodeFileUnderConstruction.initLeaseRecovery:"
-        + " No blocks found, lease removed.");
-    }
-
-    int previous = primaryNodeIndex;
-    //find an alive datanode beginning from previous
-    for(int i = 1; i <= targets.length; i++) {
-      int j = (previous + i)%targets.length;
-      if (targets[j].isAlive) {
-        DatanodeDescriptor primary = targets[primaryNodeIndex = j]; 
-        primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
-        NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
-          + " recovery started, primary=" + primary);
-        return;
-      }
-    }
-  }
-  
-  /**
-   * Update lastRecoveryTime if expired.
-   * @return true if lastRecoveryTimeis updated. 
-   */
-  synchronized boolean setLastRecoveryTime(long now) {
-    boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD;
-    if (expired) {
-      lastRecoveryTime = now;
-    }
-    return expired;
+  BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+                                          DatanodeDescriptor[] targets)
+  throws IOException {
+    if (blocks == null || blocks.length == 0) {
+      throw new IOException("Trying to update non-existant block. " +
+      		"File is empty.");
+    }
+    BlockInfoUnderConstruction ucBlock =
+      lastBlock.convertToBlockUnderConstruction(
+          BlockUCState.UNDER_CONSTRUCTION, targets);
+    ucBlock.setINode(this);
+    setBlock(numBlocks()-1, ucBlock);
+    return ucBlock;
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Wed Sep 30 23:39:30 2009
@@ -102,7 +102,7 @@
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String holder, String src) {
+  synchronized Lease addLease(String holder, String src) {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
@@ -113,6 +113,7 @@
     }
     sortedLeasesByPath.put(src, lease);
     lease.paths.add(src);
+    return lease;
   }
 
   /**
@@ -143,11 +144,22 @@
   }
 
   /**
+   * Reassign lease for file src to the new holder.
+   */
+  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+    assert newHolder != null : "new lease holder is null";
+    if (lease != null) {
+      removeLease(lease, src);
+    }
+    return addLease(newHolder, src);
+  }
+
+  /**
    * Finds the pathname for the specified pendingFile
    */
   synchronized String findPath(INodeFileUnderConstruction pendingFile
       ) throws IOException {
-    Lease lease = getLease(pendingFile.clientName);
+    Lease lease = getLease(pendingFile.getClientName());
     if (lease != null) {
       String src = lease.findPath(pendingFile);
       if (src != null) {
@@ -265,7 +277,11 @@
     Collection<String> getPaths() {
       return paths;
     }
-    
+
+    String getHolder() {
+      return holder;
+    }
+
     void replacePath(String oldpath, String newpath) {
       paths.remove(oldpath);
       paths.add(newpath);
@@ -376,7 +392,13 @@
       oldest.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
         try {
-          fsnamesystem.internalReleaseLease(oldest, p);
+          if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+            LOG.info("Lease recovery for file " + p +
+                          " is complete. File closed.");
+            removing.add(p);
+          } else
+            LOG.info("Started block recovery for file " + p +
+                          " lease " + oldest);
         } catch (IOException e) {
           LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
           removing.add(p);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Sep 30 23:39:30 2009
@@ -610,11 +610,12 @@
 
   /**
    */
-  public LocatedBlock addBlock(String src, 
-                               String clientName) throws IOException {
+  public LocatedBlock addBlock(String src, String clientName,
+                               Block previous) throws IOException {
     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
                          +src+" for "+clientName);
-    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
+    LocatedBlock locatedBlock = 
+      namesystem.getAdditionalBlock(src, clientName, previous);
     if (locatedBlock != null)
       myMetrics.numAddBlockOps.inc();
     return locatedBlock;
@@ -633,9 +634,11 @@
   }
 
   /** {@inheritDoc} */
-  public boolean complete(String src, String clientName) throws IOException {
+  public boolean complete(String src, String clientName,
+                          Block last) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
-    CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
+    CompleteFileStatus returnCode =
+      namesystem.completeFile(src, clientName, last);
     if (returnCode == CompleteFileStatus.STILL_WAITING) {
       return false;
     } else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
@@ -664,10 +667,20 @@
   }
 
   /** {@inheritDoc} */
-  public long nextGenerationStamp(Block block) throws IOException{
-    return namesystem.nextGenerationStampForBlock(block);
+  @Override
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+  throws IOException {
+    return namesystem.updateBlockForPipeline(block, clientName);
   }
 
+
+  @Override
+  public void updatePipeline(String clientName, Block oldBlock,
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+  }
+  
   /** {@inheritDoc} */
   public void commitBlockSynchronization(Block block,
       long newgenerationstamp, long newlength,

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * BlockRecoveryCommand is an instruction to a data-node to recover
+ * the specified blocks.
+ *
+ * The data-node that receives this command treats itself as a primary
+ * data-node in the recover process.
+ *
+ * Block recovery is identified by a recoveryId, which is also the new
+ * generation stamp, which the block will have after the recovery succeeds.
+ */
+public class BlockRecoveryCommand extends DatanodeCommand {
+  Collection<RecoveringBlock> recoveringBlocks;
+
+  /**
+   * This is a block with locations from which it should be recovered
+   * and the new generation stamp, which the block will have after 
+   * successful recovery.
+   * 
+   * The new generation stamp of the block, also plays role of the recovery id.
+   */
+  public static class RecoveringBlock extends LocatedBlock {
+    private long newGenerationStamp;
+
+    /**
+     * Create empty RecoveringBlock.
+     */
+    public RecoveringBlock() {
+      super();
+      newGenerationStamp = -1L;
+    }
+
+    /**
+     * Create RecoveringBlock.
+     */
+    public RecoveringBlock(Block b, DatanodeInfo[] locs, long newGS) {
+      super(b, locs, -1, false); // startOffset is unknown
+      this.newGenerationStamp = newGS;
+    }
+
+    /**
+     * Return the new generation stamp of the block,
+     * which also plays role of the recovery id.
+     */
+    public long getNewGenerationStamp() {
+      return newGenerationStamp;
+    }
+
+    ///////////////////////////////////////////
+    // Writable
+    ///////////////////////////////////////////
+    static {                                      // register a ctor
+      WritableFactories.setFactory
+        (RecoveringBlock.class,
+         new WritableFactory() {
+           public Writable newInstance() { return new RecoveringBlock(); }
+         });
+    }
+
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      out.writeLong(newGenerationStamp);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      newGenerationStamp = in.readLong();
+    }
+  }
+
+  /**
+   * Create empty BlockRecoveryCommand.
+   */
+  public BlockRecoveryCommand() {
+    this(0);
+  }
+
+  /**
+   * Create BlockRecoveryCommand with
+   * the specified capacity for recovering blocks.
+   */
+  public BlockRecoveryCommand(int capacity) {
+    super(DatanodeProtocol.DNA_RECOVERBLOCK);
+    recoveringBlocks = new ArrayList<RecoveringBlock>(capacity);
+  }
+
+  /**
+   * Return the list of recovering blocks.
+   */
+  public Collection<RecoveringBlock> getRecoveringBlocks() {
+    return recoveringBlocks;
+  }
+
+  /**
+   * Add recovering block to the command.
+   */
+  public void add(RecoveringBlock block) {
+    recoveringBlocks.add(block);
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (BlockRecoveryCommand.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new BlockRecoveryCommand(); }
+       });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(recoveringBlocks.size());
+    for(RecoveringBlock block : recoveringBlocks) {
+      block.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int numBlocks = in.readInt();
+    recoveringBlocks = new ArrayList<RecoveringBlock>(numBlocks);
+    for(int i = 0; i < numBlocks; i++) {
+      RecoveringBlock b = new RecoveringBlock();
+      b.readFields(in);
+      add(b);
+    }
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Sep 30 23:39:30 2009
@@ -35,10 +35,9 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 20: SendHeartbeat may return KeyUpdateCommand
-   *     Register returns access keys inside DatanodeRegistration object
+   * 23: nextGenerationStamp() removed.
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 23L;
   
   // error code
   final static int NOTIFY = 0;
@@ -143,12 +142,6 @@
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
   
   /**
-   * @return the next GenerationStamp to be associated with the specified
-   * block. 
-   */
-  public long nextGenerationStamp(Block block) throws IOException;
-
-  /**
    * Commit block synchronization in lease recovery
    */
   public void commitBlockSynchronization(Block block,

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Wed Sep 30 23:39:30 2009
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 /** An inter-datanode protocol for updating generation stamp
@@ -31,17 +32,36 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 3: added a finalize parameter to updateBlock
+   * 4: initReplicaRecovery(), updateReplicaUnderRecovery() added.
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   /** @return the BlockMetaDataInfo of a block;
    *  null if the block is not found 
    */
+  @Deprecated
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
    * Update the block to the new generation stamp and length.  
    */
-  void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
+  @Deprecated
+  void updateBlock(Block oldblock, Block newblock, boolean finalize)
+  throws IOException;
+
+  /**
+   * Initialize a replica recovery.
+   * 
+   * @return actual state of the replica on this data-node or 
+   * null if data-node does not have the replica.
+   */
+  ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException;
+
+  /**
+   * Update replica with the new generation stamp and length.  
+   */
+  Block updateReplicaUnderRecovery(Block oldBlock,
+                                   long recoveryId,
+                                   long newLength) throws IOException;
 }

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Replica recovery information.
+ */
+public class ReplicaRecoveryInfo extends Block {
+  private ReplicaState originalState;
+
+  public ReplicaRecoveryInfo() {
+  }
+
+  public ReplicaRecoveryInfo(Block r, ReplicaState rState) {
+    super(r);
+    originalState = rState;
+  }
+
+  public ReplicaState getOriginalReplicaState() {
+    return originalState;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (ReplicaRecoveryInfo.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new ReplicaRecoveryInfo(); }
+       });
+  }
+
+ @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    originalState = ReplicaState.read(in); 
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    originalState.write(out);
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Wed Sep 30 23:39:30 2009
@@ -96,7 +96,7 @@
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = {-16, -17, -18, -19};
+  private static int [] versions = {-16, -17, -18, -19, -20};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fi;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/** Helper methods and actions for hflush() fault injection tests */
+public class FiHFlushTestUtil extends DataTransferTestUtil {
+
+  /** {@inheritDoc} */
+  public static PipelineTest initTest() {
+    return thepipelinetest = new HFlushTest();
+  }
+  
+  /** Disk error action for fault injection tests */
+  public static class DerrAction extends DataTransferTestUtil.DataNodeAction {
+    /**
+     * @param currentTest The name of the test
+     * @param index       The index of the datanode
+     */
+    public DerrAction(String currentTest, int index) {
+      super(currentTest, index);
+    }
+
+    /** {@inheritDoc} */
+    public void run(DatanodeID id) throws IOException {
+      final Pipeline p = getPipelineTest().getPipeline(id);
+      if (p == null) {
+        FiTestUtil.LOG.info("FI: couldn't find a pipeline for " + id);
+        return;
+      }
+      if (p.contains(index, id)) {
+        final String s = super.toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new DiskErrorException(s);
+      }
+    }
+  }
+  
+  /** Class adds new type of action */
+  public static class HFlushTest extends DataTransferTest {
+    public final ActionContainer<DatanodeID> fiCallHFlush = 
+      new ActionContainer<DatanodeID>();
+  }
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Wed Sep 30 23:39:30 2009
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream.DataStreamer;
 
 import org.junit.Assert;
@@ -35,7 +36,7 @@
 
   before(DataStreamer datastreamer) : callCreateBlockOutputStream(datastreamer) {
     Assert.assertFalse(datastreamer.hasError);
-    Assert.assertEquals(0, datastreamer.errorIndex);
+    Assert.assertEquals(-1, datastreamer.errorIndex);
   }
 
   pointcut pipelineInitNonAppend(DataStreamer datastreamer):
@@ -48,8 +49,9 @@
         + datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
     try {
       if (datastreamer.hasError) {
-        DataTransferTestUtil.getDataTransferTest().fiPipelineInitErrorNonAppend
-            .run(datastreamer.errorIndex);
+        DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+        if (dtTest != null )
+          dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
       }
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -66,19 +68,18 @@
         + " errorIndex=" + datastreamer.errorIndex);
   }
 
-  pointcut pipelineErrorAfterInit(boolean onError, boolean isAppend,
-      DataStreamer datastreamer):
-    call(* processDatanodeError(boolean, boolean))
-    && args(onError, isAppend)
-    && target(datastreamer)
-    && if(onError && !isAppend);
+  pointcut pipelineErrorAfterInit(DataStreamer datastreamer):
+    call(* processDatanodeError())
+    && within (DFSClient.DFSOutputStream.DataStreamer)
+    && target(datastreamer);
 
-  before(DataStreamer datastreamer) : pipelineErrorAfterInit(boolean, boolean, datastreamer) {
+  before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
     LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
         + datastreamer.errorIndex);
     try {
-      DataTransferTestUtil.getDataTransferTest().fiPipelineErrorAfterInit
-          .run(datastreamer.errorIndex);
+      DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+      if (dtTest != null )
+        dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj Wed Sep 30 23:39:30 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+public aspect HFlushAspects {
+  public static final Log LOG = LogFactory.getLog(HFlushAspects.class);
+
+  pointcut hflushCall (DFSOutputStream outstream) :
+    execution(void DFSOutputStream.hflush(..))
+    && target (outstream); 
+  
+  /** This advise is suppose to initiate a call to the action (fiCallHFlush)
+   *  which will throw DiskErrorException if a pipeline has been created
+   *  and datanodes used are belong to that very pipeline
+   */
+  after (DFSOutputStream streamer) throws IOException : hflushCall(streamer) {
+    LOG.info("FI: hflush for any datanode");    
+    LOG.info("FI: hflush " + thisJoinPoint.getThis());
+    DatanodeInfo[] nodes = streamer.getPipeline();
+    if (nodes == null) {
+        LOG.info("No pipeline is built");
+        return;
+    }
+    if (DataTransferTestUtil.getPipelineTest() == null) {
+        LOG.info("No test has been initialized");    
+        return;
+    }
+    for (int i=0; i<nodes.length; i++) {
+        ((HFlushTest)DataTransferTestUtil.getPipelineTest()).fiCallHFlush.run(nodes[i]);
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.FiHFlushTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.FiHFlushTestUtil.DerrAction;
+import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
+import org.eclipse.jdt.core.dom.ThisExpression;
+
+import java.io.IOException;
+
+/** Class provides basic fault injection tests according to the test plan
+    of HDFS-265
+ */
+public class TestFiHFlush {
+  
+  /** Methods initializes a test and sets required actions to be used later by
+   * an injected advice
+   * @param conf mini cluster configuration
+   * @param methodName String representation of a test method invoking this 
+   * method
+   * @param block_size needed size of file's block
+   * @param a is an action to be set for the set
+   * @throws IOException in case of any errors
+   */
+  private static void runDiskErrorTest (final Configuration conf, 
+      final String methodName, final int block_size, DerrAction a)
+      throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final HFlushTest hft = (HFlushTest) FiHFlushTestUtil.initTest();
+    hft.fiCallHFlush.set(a);
+    TestHFlush.doTheJob(conf, methodName, block_size, (short)3);
+  }
+  
+  /** The tests calls 
+   * {@link #runDiskErrorTest(Configuration, String, int, DerrAction)}
+   * to make a number of writes within a block boundaries.
+   * Although hflush() is called the test shouldn't expect an IOException
+   * in this case because the invocation is happening after write() call 
+   * is complete when pipeline doesn't exist anymore.
+   * Thus, injected fault won't be triggered for 0th datanode
+   */
+  @Test
+  public void hFlushFi01_a() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runDiskErrorTest(new Configuration(), methodName, 
+        AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 0));
+  }
+
+  /** The tests calls 
+   * {@link #runDiskErrorTest(Configuration, String, int, DerrAction)}
+   * to make a number of writes across a block boundaries.
+   * hflush() is called after each write() during a pipeline life time.
+   * Thus, injected fault ought to be triggered for 0th datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi01_b() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName, 
+        customBlockSize, new DerrAction(methodName, 0));
+  }
+  
+  /** Similar to {@link #hFlushFi01_b()} but writing happens
+   * across block and checksum's boundaries
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi01_c() throws IOException { 
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName, 
+        customBlockSize, new DerrAction(methodName, 0));
+  }
+
+  /** Similar to {@link #hFlushFi01_a()} but for a pipeline's 1st datanode
+   */
+  @Test
+  public void hFlushFi02_a() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runDiskErrorTest(new Configuration(), methodName,
+        AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 1));
+  }
+
+  /** Similar to {@link #hFlushFi01_b()} but for a pipeline's 1st datanode
+   */
+@Test(expected = IOException.class)
+  public void hFlushFi02_b() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 1));
+  }
+
+  /** Similar to {@link #hFlushFi01_c()} but for a pipeline's 1st datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi02_c() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 1));
+  }
+  
+  /** Similar to {@link #hFlushFi01_a()} but for a pipeline's 2nd datanode
+   */
+  @Test
+  public void hFlushFi03_a() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runDiskErrorTest(new Configuration(), methodName,
+        AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 2));
+  }
+  
+  /** Similar to {@link #hFlushFi01_b()} but for a pipeline's 2nd datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi03_b() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 2));
+  }
+
+  /** Similar to {@link #hFlushFi01_c()} but for a pipeline's 2nd datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi03_c() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 2));
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj Wed Sep 30 23:39:30 2009
@@ -27,7 +27,7 @@
   public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class);
 
   pointcut addBlock():
-    call(LocatedBlock ClientProtocol.addBlock(String, String));
+    call(LocatedBlock ClientProtocol.addBlock(String, String,..));
 
   after() returning(LocatedBlock lb): addBlock() {
     PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest();

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Wed Sep 30 23:39:30 2009
@@ -60,8 +60,9 @@
     final DataNode d = dataxceiver.getDataNode();
     LOG.info("FI: statusRead " + status + ", datanode="
         + d.getDatanodeRegistration().getName());    
-    DataTransferTestUtil.getDataTransferTest().fiStatusRead.run(
-        d.getDatanodeRegistration());
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null) 
+      dtTest.fiStatusRead.run(d.getDatanodeRegistration());
   }
 
   pointcut receiverOpWriteBlock(DataXceiver dataxceiver):

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj Wed Sep 30 23:39:30 2009
@@ -35,11 +35,9 @@
   // the following will inject faults inside of the method in question 		
     execution (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
 
-  // the following will inject faults before the actual call of the method
-  // call (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
-
-  pointcut callCreateBlockWriteStream() : 
-    call (BlockWriteStreams FSDataset.createBlockWriteStreams(..)) 
+  pointcut callCreateBlockWriteStream(ReplicaInPipeline repl) : 
+    call (BlockWriteStreams createStreams())
+    && target (repl)
       && !within(FSDatasetAspects +);
 
   // This aspect specifies the logic of our fault point.
@@ -54,7 +52,7 @@
     }
   }
 
-  before() throws DiskOutOfSpaceException : callCreateBlockWriteStream() {
+  before(ReplicaInPipeline repl) throws DiskOutOfSpaceException : callCreateBlockWriteStream(repl) {
     if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) {
       LOG.info("Before the injection point");
       Thread.dumpStack();

Modified: hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml Wed Sep 30 23:39:30 2009
@@ -208,17 +208,22 @@
      </Match>
 
      <!--
-       CreateBlockWriteStreams and getTmpInputStreams are pretty much like a stream constructor.
+       getTmpInputStreams is pretty much like a stream constructor.
        The newly created streams are not supposed to be closed in the constructor. So ignore
        the OBL warning.
      -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.server.datanode.FSDataset" />
-       <Or>
-         <Method name="createBlockWriteStreams" />
-         <Method name="getTmpInputStreams" />
-       </Or>
+       <Method name="getTmpInputStreams" />
        <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
      </Match>
 
+     <!--
+      ResponseProccessor is thread that is designed to catch RuntimeException.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer$ResponseProcessor" />
+       <Method name="run" />
+       <Bug pattern="REC_CATCH_EXCEPTION" />
+     </Match>
  </FindBugsFilter>

Propchange: hadoop/hdfs/trunk/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
+/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487

Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java Wed Sep 30 23:39:30 2009
@@ -33,7 +33,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 
 /** Utilities for append-related tests */ 
-class AppendTestUtil {
+public class AppendTestUtil {
   /** For specifying the random number generator seed,
    *  change the following value:
    */
@@ -84,8 +84,15 @@
       LOG.info("ms=" + ms, e);
     }
   }
-
-  static FileSystem createHdfsWithDifferentUsername(Configuration conf
+  
+  /**
+   * Returns the reference to a new instance of FileSystem created 
+   * with different user name
+   * @param conf current Configuration
+   * @return FileSystem instance
+   * @throws IOException
+   */
+  public static FileSystem createHdfsWithDifferentUsername(Configuration conf
       ) throws IOException {
     Configuration conf2 = new Configuration(conf);
     String username = UserGroupInformation.getCurrentUGI().getUserName()+"_XXX";
@@ -134,7 +141,7 @@
    *  Make sure to call close() on the returned stream
    *  @throws IOException an exception might be thrown
    */
-  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+  public static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
       throws IOException {
     return fileSys.create(name, true,
         fileSys.getConf().getInt("io.file.buffer.size", 4096),
@@ -146,7 +153,7 @@
    *  the specified byte[] buffer's content
    *  @throws IOException an exception might be thrown
    */
-  static void checkFullFile(FileSystem fs, Path name, int len,
+  public static void checkFullFile(FileSystem fs, Path name, int len,
                             final byte[] compareContent, String message) throws IOException {
     FSDataInputStream stm = fs.open(name);
     byte[] actual = new byte[len];

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Sep 30 23:39:30 2009
@@ -110,7 +110,7 @@
   /** create nFiles with random names and directory hierarchies
    *  with random (but reproducible) data in them.
    */
-  void createFiles(FileSystem fs, String topdir,
+  public void createFiles(FileSystem fs, String topdir,
                    short replicationFactor) throws IOException {
     files = new MyFile[nFiles];
     

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java Wed Sep 30 23:39:30 2009
@@ -21,6 +21,7 @@
 import java.net.UnknownHostException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
+import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -200,7 +201,7 @@
           }
           for (int i = 1; i <= replication; ++i) { 
             // inject blocks for dn_i into dn_i and replica in dn_i's neighbors 
-            mc.injectBlocks((i_dn + i- 1)% numDataNodes, blocks);
+            mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks));
             System.out.println("Injecting blocks of dn " + i_dn  + " into dn" + 
                 ((i_dn + i- 1)% numDataNodes));
           }