You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/01/30 20:16:16 UTC

svn commit: r1237935 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/da...

Author: todd
Date: Mon Jan 30 19:16:15 2012
New Revision: 1237935

URL: http://svn.apache.org/viewvc?rev=1237935&view=rev
Log:
HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Mon Jan 30 19:16:15 2012
@@ -137,3 +137,5 @@ HDFS-2838. NPE in FSNamesystem when in s
 HDFS-2805. Add a test for a federated cluster with HA NNs. (Brandon Li via jitendra)
 
 HDFS-2841. HAAdmin does not work if security is enabled. (atm)
+
+HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. (todd)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Jan 30 19:16:15 2012
@@ -116,6 +116,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -813,6 +814,23 @@ public class PBHelper {
     ReceivedDeletedBlockInfoProto.Builder builder = 
         ReceivedDeletedBlockInfoProto.newBuilder();
     
+    ReceivedDeletedBlockInfoProto.BlockStatus status;
+    switch (receivedDeletedBlockInfo.getStatus()) {
+    case RECEIVING_BLOCK:
+      status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVING;
+      break;
+    case RECEIVED_BLOCK:
+      status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVED;
+      break;
+    case DELETED_BLOCK:
+      status = ReceivedDeletedBlockInfoProto.BlockStatus.DELETED;
+      break;
+    default:
+      throw new IllegalArgumentException("Bad status: " +
+          receivedDeletedBlockInfo.getStatus());
+    }
+    builder.setStatus(status);
+    
     if (receivedDeletedBlockInfo.getDelHints() != null) {
       builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints());
     }
@@ -844,7 +862,21 @@ public class PBHelper {
 
   public static ReceivedDeletedBlockInfo convert(
       ReceivedDeletedBlockInfoProto proto) {
-    return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()),
+    ReceivedDeletedBlockInfo.BlockStatus status = null;
+    switch (proto.getStatus()) {
+    case RECEIVING:
+      status = BlockStatus.RECEIVING_BLOCK;
+      break;
+    case RECEIVED:
+      status = BlockStatus.RECEIVED_BLOCK;
+      break;
+    case DELETED:
+      status = BlockStatus.DELETED_BLOCK;
+      break;
+    }
+    return new ReceivedDeletedBlockInfo(
+        PBHelper.convert(proto.getBlock()),
+        status,
         proto.hasDeleteHint() ? proto.getDeleteHint() : null);
   }
   

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jan 30 19:16:15 2012
@@ -2256,13 +2256,19 @@ assert storedBlock.findDatanode(dn) < 0 
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.remove(block);
-
+    processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
+        delHintNode);
+  }
+  
+  private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
+      ReplicaState reportedState, DatanodeDescriptor delHintNode)
+      throws IOException {
     // blockReceived reports a finalized block
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, block, ReplicaState.FINALIZED,
+    processReportedBlock(node, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2286,47 +2292,66 @@ assert storedBlock.findDatanode(dn) < 0 
     }
   }
 
-  /** The given node is reporting that it received/deleted certain blocks. */
-  public void blockReceivedAndDeleted(final DatanodeID nodeID, 
+  /**
+   * The given node is reporting incremental information about some blocks.
+   * This includes blocks that are starting to be received, completed being
+   * received, or deleted.
+   */
+  public void processIncrementalBlockReport(final DatanodeID nodeID, 
      final String poolId, 
-     final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
+     final ReceivedDeletedBlockInfo blockInfos[]
   ) throws IOException {
     namesystem.writeLock();
     int received = 0;
     int deleted = 0;
+    int receiving = 0;
     try {
       final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
       if (node == null || !node.isAlive) {
         NameNode.stateChangeLog
-            .warn("BLOCK* blockReceivedDeleted"
+            .warn("BLOCK* processIncrementalBlockReport"
                 + " is received from dead or unregistered node "
                 + nodeID.getName());
         throw new IOException(
-            "Got blockReceivedDeleted message from unregistered or dead node");
+            "Got incremental block report from unregistered or dead node");
       }
 
-      for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
-        if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
-          removeStoredBlock(
-              receivedAndDeletedBlocks[i].getBlock(), node);
+      for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+        switch (rdbi.getStatus()) {
+        case DELETED_BLOCK:
+          removeStoredBlock(rdbi.getBlock(), node);
           deleted++;
-        } else {
-          addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
-              receivedAndDeletedBlocks[i].getDelHints());
+          break;
+        case RECEIVED_BLOCK:
+          addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
           received++;
+          break;
+        case RECEIVING_BLOCK:
+          receiving++;
+          processAndHandleReportedBlock(node, rdbi.getBlock(),
+              ReplicaState.RBW, null);
+          break;
+        default:
+          String msg = 
+            "Unknown block status code reported by " + nodeID.getName() +
+            ": " + rdbi;
+          NameNode.stateChangeLog.warn(msg);
+          assert false : msg; // if assertions are enabled, throw.
+          break;
         }
         if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* block"
-              + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
-                  : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+          NameNode.stateChangeLog.debug("BLOCK* block "
+              + (rdbi.getStatus()) + ": " + rdbi.getBlock()
               + " is received from " + nodeID.getName());
         }
       }
     } finally {
       namesystem.writeUnlock();
       NameNode.stateChangeLog
-          .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
-              + nodeID.getName() + " received: " + received + ", "
+          .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+              + nodeID.getName()
+              +  " receiving: " + receiving + ", "
+              + " received: " + received + ", "
               + " deleted: " + deleted);
     }
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Jan 30 19:16:15 2012
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -202,10 +203,13 @@ class BPOfferService {
   void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
     checkBlock(block);
     checkDelHint(delHint);
-    ReceivedDeletedBlockInfo bInfo = 
-               new ReceivedDeletedBlockInfo(block.getLocalBlock(), delHint);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
+        block.getLocalBlock(),
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
+        delHint);
+
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeReceivedBlock(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo);
     }
   }
 
@@ -224,13 +228,24 @@ class BPOfferService {
 
   void notifyNamenodeDeletedBlock(ExtendedBlock block) {
     checkBlock(block);
-    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block
-          .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
+       block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
       actor.notifyNamenodeDeletedBlock(bInfo);
     }
   }
+  
+  void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+    checkBlock(block);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
+       block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
+    
+    for (BPServiceActor actor : bpServices) {
+      actor.notifyNamenodeBlockImmediately(bInfo);
+    }
+  }
+
 
   //This must be called only by blockPoolManager
   void start() {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Mon Jan 30 19:16:15 2012
@@ -267,7 +267,7 @@ class BPServiceActor implements Runnable
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeReceivedBlock(ReceivedDeletedBlockInfo bInfo) {
+  void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
     synchronized (receivedAndDeletedBlockList) {
       receivedAndDeletedBlockList.add(bInfo);
       pendingReceivedRequests++;
@@ -341,6 +341,12 @@ class BPServiceActor implements Runnable
     long startTime = now();
     if (startTime - lastBlockReport > dnConf.blockReportInterval) {
 
+      // Flush any block information that precedes the block report. Otherwise
+      // we have a chance that we will miss the delHint information
+      // or we will report an RBW replica after the BlockReport already reports
+      // a FINALIZED one.
+      reportReceivedDeletedBlocks();
+
       // Create block report
       long brCreateStartTime = now();
       BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Jan 30 19:16:15 2012
@@ -153,6 +153,7 @@ class BlockReceiver implements Closeable
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
           replicaInfo = datanode.data.createRbw(block);
+          datanode.notifyNamenodeReceivingBlock(block);
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
@@ -166,6 +167,7 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
+          datanode.notifyNamenodeReceivingBlock(block);
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -174,6 +176,7 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
+          datanode.notifyNamenodeReceivingBlock(block);
           break;
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jan 30 19:16:15 2012
@@ -522,7 +522,18 @@ public class DataNode extends Configured
     if(bpos != null) {
       bpos.notifyNamenodeReceivedBlock(block, delHint); 
     } else {
-      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+      LOG.error("Cannot find BPOfferService for reporting block received for bpid="
+          + block.getBlockPoolId());
+    }
+  }
+  
+  // calls specific to BP
+  protected void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if(bpos != null) {
+      bpos.notifyNamenodeReceivingBlock(block); 
+    } else {
+      LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
           + block.getBlockPoolId());
     }
   }
@@ -533,7 +544,7 @@ public class DataNode extends Configured
     if (bpos != null) {
       bpos.notifyNamenodeDeletedBlock(block);
     } else {
-      LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+      LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
           + block.getBlockPoolId());
     }
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Jan 30 19:16:15 2012
@@ -4933,7 +4933,7 @@ public class FSNamesystem implements Nam
                     + m.getNodeReg().getName() + " "
                     + m.getReceivedAndDeletedBlocks().length + " blocks.");
           }
-          this.getBlockManager().blockReceivedAndDeleted(m.getNodeReg(),
+          this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(),
               m.getPoolId(), m.getReceivedAndDeletedBlocks());
           break;
         case BLOCK_REPORT:

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Jan 30 19:16:15 2012
@@ -928,7 +928,7 @@ class NameNodeRpcServer implements Namen
           +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
           +" blocks.");
     }
-    namesystem.getBlockManager().blockReceivedAndDeleted(
+    namesystem.getBlockManager().processIncrementalBlockReport(
         nodeReg, poolId, receivedAndDeletedBlocks);
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java Mon Jan 30 19:16:15 2012
@@ -25,22 +25,47 @@ import java.io.IOException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
- * A data structure to store Block and delHints together, used to send
- * received/deleted ACKs.
+ * A data structure to store the blocks in an incremental block report. 
  */
 public class ReceivedDeletedBlockInfo implements Writable {
   Block block;
+  BlockStatus status;
   String delHints;
 
-  public final static String TODELETE_HINT = "-";
+  public static enum BlockStatus {
+    RECEIVING_BLOCK(1),
+    RECEIVED_BLOCK(2),
+    DELETED_BLOCK(3);
+    
+    private final int code;
+    BlockStatus(int code) {
+      this.code = code;
+    }
+    
+    public int getCode() {
+      return code;
+    }
+    
+    public static BlockStatus fromCode(int code) {
+      for (BlockStatus bs : BlockStatus.values()) {
+        if (bs.code == code) {
+          return bs;
+        }
+      }
+      return null;
+    }
+  }
 
   public ReceivedDeletedBlockInfo() {
   }
 
-  public ReceivedDeletedBlockInfo(Block blk, String delHints) {
+  public ReceivedDeletedBlockInfo(
+      Block blk, BlockStatus status, String delHints) {
     this.block = blk;
+    this.status = status;
     this.delHints = delHints;
   }
 
@@ -60,13 +85,19 @@ public class ReceivedDeletedBlockInfo im
     this.delHints = hints;
   }
 
+  public BlockStatus getStatus() {
+    return status;
+  }
+
   public boolean equals(Object o) {
     if (!(o instanceof ReceivedDeletedBlockInfo)) {
       return false;
     }
     ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o;
     return this.block.equals(other.getBlock())
-        && this.delHints.equals(other.delHints);
+        && this.status == other.status
+        && (this.delHints == other.delHints ||
+            this.delHints != null && this.delHints.equals(other.delHints));
   }
 
   public int hashCode() {
@@ -79,23 +110,30 @@ public class ReceivedDeletedBlockInfo im
   }
 
   public boolean isDeletedBlock() {
-    return delHints.equals(TODELETE_HINT);
+    return status == BlockStatus.DELETED_BLOCK;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     this.block.write(out);
-    Text.writeString(out, this.delHints);
+    WritableUtils.writeVInt(out, this.status.code);
+    if (this.status == BlockStatus.DELETED_BLOCK) {
+      Text.writeString(out, this.delHints);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     this.block = new Block();
     this.block.readFields(in);
-    this.delHints = Text.readString(in);
+    this.status = BlockStatus.fromCode(WritableUtils.readVInt(in));
+    if (this.status == BlockStatus.DELETED_BLOCK) {
+      this.delHints = Text.readString(in);
+    }
   }
 
   public String toString() {
-    return block.toString() + ", delHint: " + delHints;
+    return block.toString() + ", status: " + status +
+      ", delHint: " + delHints;
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java Mon Jan 30 19:16:15 2012
@@ -24,8 +24,10 @@ import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A data structure to store Block and delHints together, used to send
@@ -33,33 +35,43 @@ import org.apache.hadoop.io.Writable;
  */
 public class ReceivedDeletedBlockInfoWritable implements Writable {
   BlockWritable block;
+  int statusCode;
   String delHints;
 
-  public final static String TODELETE_HINT = "-";
 
   public ReceivedDeletedBlockInfoWritable() {
   }
 
-  public ReceivedDeletedBlockInfoWritable(BlockWritable blk, String delHints) {
+  public ReceivedDeletedBlockInfoWritable(
+      BlockWritable blk, int statusCode, String delHints) {
     this.block = blk;
+    this.statusCode = statusCode;
     this.delHints = delHints;
   }
 
+
   @Override
   public void write(DataOutput out) throws IOException {
     this.block.write(out);
-    Text.writeString(out, this.delHints);
+    WritableUtils.writeVInt(out, this.statusCode);
+    if (this.statusCode == BlockStatus.DELETED_BLOCK.getCode()) {
+      Text.writeString(out, this.delHints);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     this.block = new BlockWritable();
     this.block.readFields(in);
-    this.delHints = Text.readString(in);
+    this.statusCode = WritableUtils.readVInt(in);
+    if (this.statusCode == BlockStatus.DELETED_BLOCK.getCode()) {
+      this.delHints = Text.readString(in);
+    }
   }
 
   public String toString() {
-    return block.toString() + ", delHint: " + delHints;
+    return block.toString() + ", statusCode: " + statusCode +
+      ", delHint: " + delHints;
   }
 
   public static ReceivedDeletedBlockInfo[] convert(
@@ -83,13 +95,16 @@ public class ReceivedDeletedBlockInfoWri
   }
 
   public ReceivedDeletedBlockInfo convert() {
-    return new ReceivedDeletedBlockInfo(block.convert(), delHints);
+    return new ReceivedDeletedBlockInfo(block.convert(),
+        BlockStatus.fromCode(statusCode), delHints);
   }
 
   public static ReceivedDeletedBlockInfoWritable convert(
       ReceivedDeletedBlockInfo b) {
     if (b == null) return null;
-    return new ReceivedDeletedBlockInfoWritable(BlockWritable.convert(b
-        .getBlock()), b.getDelHints());
+    return new ReceivedDeletedBlockInfoWritable(
+        BlockWritable.convert(b.getBlock()),
+        b.getStatus().getCode(),
+        b.getDelHints());
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Mon Jan 30 19:16:15 2012
@@ -213,12 +213,16 @@ message BlockReportResponseProto {
 /**
  * Data structure to send received or deleted block information
  * from datanode to namenode.
- *
- * deleteHint set to "-" indicates block deletion.
- * other deleteHint indicates block addition.
  */
 message ReceivedDeletedBlockInfoProto {
+  enum BlockStatus {
+    RECEIVING = 1; // block being created
+    RECEIVED = 2; // block creation complete
+    DELETED = 3;
+  }
+
   required BlockProto block = 1;
+  required BlockStatus status = 3;
   optional string deleteHint = 2;
 }
 
@@ -329,7 +333,9 @@ service DatanodeProtocolService {
   rpc blockReport(BlockReportRequestProto) returns(BlockReportResponseProto);
 
   /**
-   * Report from datanode about recently received or deleted block
+   * Incremental block report from the DN. This contains info about recently
+   * received and deleted blocks, as well as when blocks start being
+   * received.
    */
   rpc blockReceivedAndDeleted(BlockReceivedAndDeletedRequestProto) 
       returns(BlockReceivedAndDeletedResponseProto);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java Mon Jan 30 19:16:15 2012
@@ -113,8 +113,14 @@ public class AppendTestUtil {
     int i = -1;
     try {
       final FileStatus status = fs.getFileStatus(p);
-      TestCase.assertEquals(length, status.getLen());
-      InputStream in = fs.open(p);
+      FSDataInputStream in = fs.open(p);
+      if (in.getWrappedStream() instanceof DFSInputStream) {
+        long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
+        TestCase.assertEquals(length, len);
+      } else {
+        TestCase.assertEquals(length, status.getLen());
+      }
+      
       for(i++; i < length; i++) {
         TestCase.assertEquals((byte)i, (byte)in.read());  
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Mon Jan 30 19:16:15 2012
@@ -884,7 +884,8 @@ public class NNThroughputBenchmark {
           nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
               .getNamesystem().getBlockPoolId(),
               new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
-                  blocks[i], DataNode.EMPTY_DEL_HINT) });
+                  blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
+                  null) });
         }
       }
       return blocks.length;
@@ -999,7 +1000,8 @@ public class NNThroughputBenchmark {
           nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
               .getBlock().getBlockPoolId(),
               new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
-                  .getBlock().getLocalBlock(), "") });
+                  .getBlock().getLocalBlock(),
+                  ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) });
         }
       }
       return prevBlock;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1237935&r1=1237934&r2=1237935&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Mon Jan 30 19:16:15 2012
@@ -107,7 +107,9 @@ public class TestDeadDatanode {
     DatanodeProtocol dnp = cluster.getNameNodeRpc();
     
     ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
-        new Block(0), "") };
+        new Block(0), 
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
+        null) };
     
     // Ensure blockReceived call from dead datanode is rejected with IOException
     try {

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1237935&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java Mon Jan 30 19:16:15 2012
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Level;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test cases regarding pipeline recovery during NN failover.
+ */
+public class TestPipelinesFailover {
+  static {
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(
+        "org.apache.hadoop.io.retry.RetryInvocationHandler")).getLogger().setLevel(Level.ALL);
+
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+  }
+  
+  protected static final Log LOG = LogFactory.getLog(
+      TestPipelinesFailover.class);
+  private static final Path TEST_PATH =
+    new Path("/test-file");
+  private static final int BLOCK_SIZE = 4096;
+  private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2;
+
+  /**
+   * Tests continuing a write pipeline over a failover.
+   */
+  @Test(timeout=30000)
+  public void testWriteOverFailover() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    // Don't check replication periodically.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
+    
+    FSDataOutputStream stm = null;
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(3)
+      .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      Thread.sleep(500);
+
+      LOG.info("Starting with NN 0 active");
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      stm = fs.create(TEST_PATH);
+      
+      // write a block and a half
+      AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
+      
+      // Make sure all of the blocks are written out before failover.
+      stm.hflush();
+
+      LOG.info("Failing over to NN 1");
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+
+      assertTrue(fs.exists(TEST_PATH));
+      FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
+      BlockManagerTestUtil.updateState(ns1.getBlockManager());
+      assertEquals(0, ns1.getPendingReplicationBlocks());
+      assertEquals(0, ns1.getCorruptReplicaBlocks());
+      assertEquals(0, ns1.getMissingBlocksCount());
+
+      // write another block and a half
+      AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
+
+      stm.close();
+      stm = null;
+      
+      AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3);
+    } finally {
+      IOUtils.closeStream(stm);
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Tests continuing a write pipeline over a failover when a DN fails
+   * after the failover - ensures that updating the pipeline succeeds
+   * even when the pipeline was constructed on a different NN.
+   */
+  @Test(timeout=30000)
+  public void testWriteOverFailoverWithDnFail() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    
+    FSDataOutputStream stm = null;
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(5)
+      .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      Thread.sleep(500);
+
+      LOG.info("Starting with NN 0 active");
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      stm = fs.create(TEST_PATH);
+      
+      // write a block and a half
+      AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
+      
+      // Make sure all the blocks are written before failover
+      stm.hflush();
+
+      LOG.info("Failing over to NN 1");
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+
+      assertTrue(fs.exists(TEST_PATH));
+      
+      cluster.stopDataNode(0);
+
+      // write another block and a half
+      AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
+      stm.hflush(); // TODO: see above
+      
+      LOG.info("Failing back to NN 0");
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      
+      cluster.stopDataNode(1);
+      
+      AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF);
+      stm.hflush(); // TODO: see above
+      
+      
+      stm.close();
+      stm = null;
+      
+      AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF * 3);
+    } finally {
+      IOUtils.closeStream(stm);
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Tests lease recovery if a client crashes. This approximates the
+   * use case of HBase WALs being recovered after a NN failover.
+   */
+  @Test(timeout=30000)
+  public void testLeaseRecoveryAfterFailover() throws Exception {
+    final Configuration conf = new Configuration();
+    // Disable permissions so that another user can recover the lease.
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    
+    FSDataOutputStream stm = null;
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(3)
+      .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      Thread.sleep(500);
+
+      LOG.info("Starting with NN 0 active");
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      stm = fs.create(TEST_PATH);
+      
+      // write a block and a half
+      AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
+      stm.hflush();
+      
+      LOG.info("Failing over to NN 1");
+      
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      
+      assertTrue(fs.exists(TEST_PATH));
+      
+      FileSystem fsOtherUser = UserGroupInformation.createUserForTesting(
+          "otheruser", new String[] { "othergroup"})
+          .doAs(new PrivilegedExceptionAction<FileSystem>() {
+            @Override
+            public FileSystem run() throws Exception {
+              return HATestUtil.configureFailoverFs(cluster, conf);
+            }
+          });
+      ((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH);
+      
+      AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
+    } finally {
+      IOUtils.closeStream(stm);
+      cluster.shutdown();
+    }
+  }
+
+}