You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/12/21 05:32:41 UTC

svn commit: r1221608 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/serv...

Author: todd
Date: Wed Dec 21 04:32:40 2011
New Revision: 1221608

URL: http://svn.apache.org/viewvc?rev=1221608&view=rev
Log:
HDFS-1972. Fencing mechanism for block invalidations and replications. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Dec 21 04:32:40 2011
@@ -73,3 +73,5 @@ HDFS-2678. When a FailoverProxyProvider 
 HDFS-2682. When a FailoverProxyProvider is used, Client should not retry for 45 times if it is timing out to connect to server. (Uma Maheswara Rao G via todd)
 
 HDFS-2693. Fix synchronization issues around state transition (todd)
+
+HDFS-1972. Fencing mechanism for block invalidations and replications (todd)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Dec 21 04:32:40 2011
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -68,6 +69,8 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -91,6 +94,7 @@ public class BlockManager {
   private volatile long underReplicatedBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
   private volatile long excessBlocksCount = 0L;
+  private volatile long postponedMisreplicatedBlocksCount = 0L;
   
   /** Used by metrics */
   public long getPendingReplicationBlocksCount() {
@@ -116,6 +120,10 @@ public class BlockManager {
   public long getExcessBlocksCount() {
     return excessBlocksCount;
   }
+  /** Used by metrics */
+  public long getPostponedMisreplicatedBlocksCount() {
+    return postponedMisreplicatedBlocksCount;
+  }
 
   /**replicationRecheckInterval is how often namenode checks for new replication work*/
   private final long replicationRecheckInterval;
@@ -134,6 +142,15 @@ public class BlockManager {
 
   /** Blocks to be invalidated. */
   private final InvalidateBlocks invalidateBlocks;
+  
+  /**
+   * After a failover, over-replicated blocks may not be handled
+   * until all of the replicas have done a block report to the
+   * new active. This is to make sure that this NameNode has been
+   * notified of all block deletions that might have been pending
+   * when the failover happened.
+   */
+  private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
 
   //
   // Keeps a TreeSet for every named node. Each treeset contains
@@ -316,49 +333,15 @@ public class BlockManager {
       out.println("Metasave: Blocks waiting for replication: " + 
                   neededReplications.size());
       for (Block block : neededReplications) {
-        List<DatanodeDescriptor> containingNodes =
-                                          new ArrayList<DatanodeDescriptor>();
-        List<DatanodeDescriptor> containingLiveReplicasNodes =
-          new ArrayList<DatanodeDescriptor>();
-        
-        NumberReplicas numReplicas = new NumberReplicas();
-        // source node returned is not used
-        chooseSourceDatanode(block, containingNodes,
-            containingLiveReplicasNodes, numReplicas);
-        assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
-        int usableReplicas = numReplicas.liveReplicas() +
-                             numReplicas.decommissionedReplicas();
-       
-        if (block instanceof BlockInfo) {
-          String fileName = ((BlockInfo)block).getINode().getFullPathName();
-          out.print(fileName + ": ");
-        }
-        // l: == live:, d: == decommissioned c: == corrupt e: == excess
-        out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
-                  " (replicas:" +
-                  " l: " + numReplicas.liveReplicas() +
-                  " d: " + numReplicas.decommissionedReplicas() +
-                  " c: " + numReplicas.corruptReplicas() +
-                  " e: " + numReplicas.excessReplicas() + ") "); 
-
-        Collection<DatanodeDescriptor> corruptNodes = 
-                                      corruptReplicas.getNodes(block);
-        
-        for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-             jt.hasNext();) {
-          DatanodeDescriptor node = jt.next();
-          String state = "";
-          if (corruptNodes != null && corruptNodes.contains(node)) {
-            state = "(corrupt)";
-          } else if (node.isDecommissioned() || 
-              node.isDecommissionInProgress()) {
-            state = "(decommissioned)";
-          }          
-          out.print(" " + node + state + " : ");
-        }
-        out.println("");
+        dumpBlockMeta(block, out);
       }
     }
+    
+    // Dump any postponed over-replicated blocks
+    out.println("Mis-replicated blocks that have been postponed:");
+    for (Block block : postponedMisreplicatedBlocks) {
+      dumpBlockMeta(block, out);
+    }
 
     // Dump blocks from pendingReplication
     pendingReplications.metaSave(out);
@@ -369,6 +352,58 @@ public class BlockManager {
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
   }
+  
+  /**
+   * Dump the metadata for the given block in a human-readable
+   * form.
+   */
+  private void dumpBlockMeta(Block block, PrintWriter out) {
+    List<DatanodeDescriptor> containingNodes =
+                                      new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> containingLiveReplicasNodes =
+      new ArrayList<DatanodeDescriptor>();
+    
+    NumberReplicas numReplicas = new NumberReplicas();
+    // source node returned is not used
+    chooseSourceDatanode(block, containingNodes,
+        containingLiveReplicasNodes, numReplicas);
+    assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
+    int usableReplicas = numReplicas.liveReplicas() +
+                         numReplicas.decommissionedReplicas();
+    
+    if (block instanceof BlockInfo) {
+      String fileName = ((BlockInfo)block).getINode().getFullPathName();
+      out.print(fileName + ": ");
+    }
+    // l: == live:, d: == decommissioned c: == corrupt e: == excess
+    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+              " (replicas:" +
+              " l: " + numReplicas.liveReplicas() +
+              " d: " + numReplicas.decommissionedReplicas() +
+              " c: " + numReplicas.corruptReplicas() +
+              " e: " + numReplicas.excessReplicas() + ") "); 
+
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(block);
+    
+    for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+         jt.hasNext();) {
+      DatanodeDescriptor node = jt.next();
+      String state = "";
+      if (corruptNodes != null && corruptNodes.contains(node)) {
+        state = "(corrupt)";
+      } else if (node.isDecommissioned() || 
+          node.isDecommissionInProgress()) {
+        state = "(decommissioned)";
+      }
+      
+      if (node.areBlockContentsStale()) {
+        state += " (block deletions maybe out of date)";
+      }
+      out.print(" " + node + state + " : ");
+    }
+    out.println("");
+  }
 
   /** @return maxReplicationStreams */
   public int getMaxReplicationStreams() {
@@ -782,6 +817,14 @@ public class BlockManager {
 
     node.resetBlocks();
     invalidateBlocks.remove(node.getStorageID());
+    
+    // If the DN hasn't block-reported since the most recent
+    // failover, then we may have been holding up on processing
+    // over-replicated blocks because of it. But we can now
+    // process those blocks.
+    if (node.areBlockContentsStale()) {
+      rescanPostponedMisreplicatedBlocks();
+    }
   }
 
   /**
@@ -879,10 +922,17 @@ public class BlockManager {
           + " because datanode " + dn.getName() + " does not exist.");
     }
 
-    // Check how many copies we have of the block. If we have at least one
-    // copy on a live node, then we can delete it.
-    int count = countNodes(blk).liveReplicas();
-    if (count >= 1) {
+    // Check how many copies we have of the block
+    NumberReplicas nr = countNodes(blk);
+    if (nr.replicasOnStaleNodes() > 0) {
+      NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
+          "invalidation of block " + blk + " on " + dn + " because " +
+          nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
+          "with potentially out-of-date block reports.");
+      postponeBlock(blk);
+
+    } else if (nr.liveReplicas() >= 1) {
+      // If we have at least one copy on a live node, then we can delete it.
       addToInvalidates(blk, dn);
       removeStoredBlock(blk, node);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -895,6 +945,13 @@ public class BlockManager {
     }
   }
 
+  private void postponeBlock(Block blk) {
+    if (postponedMisreplicatedBlocks.add(blk)) {
+      postponedMisreplicatedBlocksCount++;
+    }
+  }
+  
+  
   void updateState() {
     pendingReplicationBlocksCount = pendingReplications.size();
     underReplicatedBlocksCount = neededReplications.size();
@@ -933,7 +990,7 @@ public class BlockManager {
    *
    * @return number of blocks scheduled for replication during this iteration.
    */
-  private int computeReplicationWork(int blocksToProcess) throws IOException {
+  int computeReplicationWork(int blocksToProcess) throws IOException {
     List<List<Block>> blocksToReplicate = null;
     namesystem.writeLock();
     try {
@@ -984,8 +1041,10 @@ public class BlockManager {
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
                 block, containingNodes, liveReplicaNodes, numReplicas);
-            if(srcNode == null) // block can not be replicated from any node
+            if(srcNode == null) { // block can not be replicated from any node
+              LOG.debug("Block " + block + " cannot be repl from any node");
               continue;
+          }
 
             assert liveReplicaNodes.size() == numReplicas.liveReplicas();
             // do not schedule more if enough replicas is already pending
@@ -1235,7 +1294,7 @@ public class BlockManager {
         srcNode = node;
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned, corrupt, excess);
+      numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
     return srcNode;
   }
 
@@ -1316,6 +1375,19 @@ public class BlockManager {
       } else {
         processReport(node, newReport);
       }
+      
+      // Now that we have an up-to-date block report, we know that any
+      // deletions from a previous NN iteration have been accounted for.
+      boolean staleBefore = node.areBlockContentsStale();
+      node.receivedBlockReport();
+      if (staleBefore && !node.areBlockContentsStale()) {
+        LOG.info("BLOCK* processReport: " +
+            "Received first block report from " + node +
+            " after becoming active. Its block contents are no longer" +
+            " considered stale.");
+        rescanPostponedMisreplicatedBlocks();
+      }
+      
     } finally {
       endTime = Util.now();
       namesystem.writeUnlock();
@@ -1328,6 +1400,37 @@ public class BlockManager {
         + ", processing time: " + (endTime - startTime) + " msecs");
   }
 
+  /**
+   * Rescan the list of blocks which were previously postponed.
+   */
+  private void rescanPostponedMisreplicatedBlocks() {
+    for (Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
+         it.hasNext();) {
+      Block b = it.next();
+      
+      BlockInfo bi = blocksMap.getStoredBlock(b);
+      if (bi == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+              "Postponed mis-replicated block " + b + " no longer found " +
+              "in block map.");
+        }
+        it.remove();
+        postponedMisreplicatedBlocksCount--;
+        continue;
+      }
+      MisReplicationResult res = processMisReplicatedBlock(bi);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+            "Re-scanned block " + b + ", result is " + res);
+      }
+      if (res != MisReplicationResult.POSTPONE) {
+        it.remove();
+        postponedMisreplicatedBlocksCount--;
+      }
+    }
+  }
+  
   private void processReport(final DatanodeDescriptor node,
       final BlockListAsLongs report) throws IOException {
     // Normal case:
@@ -1505,8 +1608,9 @@ public class BlockManager {
 
     // Ignore replicas already scheduled to be removed from the DN
     if(invalidateBlocks.contains(dn.getStorageID(), block)) {
-      assert storedBlock.findDatanode(dn) < 0 : "Block " + block
-        + " in recentInvalidatesSet should not appear in DN " + dn;
+/*  TODO: following assertion is incorrect, see HDFS-2668
+assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+        + " in recentInvalidatesSet should not appear in DN " + dn; */
       return storedBlock;
     }
 
@@ -1773,41 +1877,81 @@ public class BlockManager {
   public void processMisReplicatedBlocks() {
     assert namesystem.hasWriteLock();
 
-    long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
+    long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0;
     neededReplications.clear();
     for (BlockInfo block : blocksMap.getBlocks()) {
-      INodeFile fileINode = block.getINode();
-      if (fileINode == null) {
-        // block does not belong to any file
-        nrInvalid++;
-        addToInvalidates(block);
-        continue;
-      }
-      // calculate current replication
-      short expectedReplication = fileINode.getReplication();
-      NumberReplicas num = countNodes(block);
-      int numCurrentReplica = num.liveReplicas();
-      // add to under-replicated queue if need to be
-      if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
-        if (neededReplications.add(block, numCurrentReplica, num
-            .decommissionedReplicas(), expectedReplication)) {
-          nrUnderReplicated++;
-        }
-      }
-
-      if (numCurrentReplica > expectedReplication) {
-        // over-replicated block
+      MisReplicationResult res = processMisReplicatedBlock(block);
+      LOG.info("block " + block + ": " + res);
+      switch (res) {
+      case UNDER_REPLICATED:
+        nrUnderReplicated++;
+        break;
+      case OVER_REPLICATED:
         nrOverReplicated++;
-        processOverReplicatedBlock(block, expectedReplication, null, null);
+        break;
+      case INVALID:
+        nrInvalid++;
+        break;
+      case POSTPONE:
+        nrPostponed++;
+        postponeBlock(block);
+        break;
+      case OK:
+        break;
+      default:
+        throw new AssertionError("Invalid enum value: " + res);
       }
     }
-
+    
     LOG.info("Total number of blocks            = " + blocksMap.size());
     LOG.info("Number of invalid blocks          = " + nrInvalid);
     LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
-    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
+    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated +
+        ((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : ""));
   }
 
+  /**
+   * Process a single possibly misreplicated block. This adds it to the
+   * appropriate queues if necessary, and returns a result code indicating
+   * what happened with it.
+   */
+  private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
+    INodeFile fileINode = block.getINode();
+    if (fileINode == null) {
+      // block does not belong to any file
+      addToInvalidates(block);
+      return MisReplicationResult.INVALID;
+    }
+    // calculate current replication
+    short expectedReplication = fileINode.getReplication();
+    NumberReplicas num = countNodes(block);
+    int numCurrentReplica = num.liveReplicas();
+    // add to under-replicated queue if need to be
+    if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+      if (neededReplications.add(block, numCurrentReplica, num
+          .decommissionedReplicas(), expectedReplication)) {
+        return MisReplicationResult.UNDER_REPLICATED;
+      }
+    }
+
+    if (numCurrentReplica > expectedReplication) {
+      if (num.replicasOnStaleNodes() > 0) {
+        // If any of the replicas of this block are on nodes that are
+        // considered "stale", then these replicas may in fact have
+        // already been deleted. So, we cannot safely act on the
+        // over-replication until a later point in time, when
+        // the "stale" nodes have block reported.
+        return MisReplicationResult.POSTPONE;
+      }
+      
+      // over-replicated block
+      processOverReplicatedBlock(block, expectedReplication, null, null);
+      return MisReplicationResult.OVER_REPLICATED;
+    }
+    
+    return MisReplicationResult.OK;
+  }
+  
   /** Set replication for the blocks. */
   public void setReplication(final short oldRepl, final short newRepl,
       final String src, final Block... blocks) throws IOException {
@@ -1851,6 +1995,14 @@ public class BlockManager {
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
          it.hasNext();) {
       DatanodeDescriptor cur = it.next();
+      if (cur.areBlockContentsStale()) {
+        LOG.info("BLOCK* processOverReplicatedBlock: " +
+            "Postponing processing of over-replicated block " +
+            block + " since datanode " + cur + " does not yet have up-to-date " +
+            "block information.");
+        postponeBlock(block);
+        return;
+      }
       LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
           .getStorageID());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
@@ -2153,13 +2305,15 @@ public class BlockManager {
   }
 
   /**
-   * Return the number of nodes that are live and decommissioned.
+   * Return the number of nodes hosting a given block, grouped
+   * by the state of those replicas.
    */
   public NumberReplicas countNodes(Block b) {
-    int count = 0;
+    int decommissioned = 0;
     int live = 0;
     int corrupt = 0;
     int excess = 0;
+    int stale = 0;
     Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     while (nodeIter.hasNext()) {
@@ -2167,7 +2321,7 @@ public class BlockManager {
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
         corrupt++;
       } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-        count++;
+        decommissioned++;
       } else {
         LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
             .getStorageID());
@@ -2177,8 +2331,11 @@ public class BlockManager {
           live++;
         }
       }
+      if (node.areBlockContentsStale()) {
+        stale++;
+      }
     }
-    return new NumberReplicas(live, count, corrupt, excess);
+    return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
   }
 
   /** 
@@ -2323,10 +2480,14 @@ public class BlockManager {
   }
 
   public void removeBlock(Block block) {
+    assert namesystem.hasWriteLock();
     block.setNumBytes(BlockCommand.NO_ACK);
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);
+    if (postponedMisreplicatedBlocks.remove(block)) {
+      postponedMisreplicatedBlocksCount--;
+    }
   }
 
   public BlockInfo getStoredBlock(Block block) {
@@ -2387,8 +2548,10 @@ public class BlockManager {
     namesystem.writeLock();
     try {
       // blocks should not be replicated or removed if safe mode is on
-      if (namesystem.isInSafeMode())
+      if (namesystem.isInSafeMode()) {
+        LOG.debug("In safemode, not computing replication work");
         return 0;
+      }
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
       return invalidateBlocks.invalidateWork(nodeId);
@@ -2571,6 +2734,19 @@ public class BlockManager {
     return workFound;
   }
 
+  /**
+   * Clear all queues that hold decisions previously made by
+   * this NameNode.
+   */
+  public void clearQueues() {
+    neededReplications.clear();
+    pendingReplications.clear();
+    excessReplicateMap.clear();
+    invalidateBlocks.clear();
+    datanodeManager.clearPendingQueues();
+  };
+  
+
   private static class ReplicationWork {
 
     private Block block;
@@ -2601,4 +2777,22 @@ public class BlockManager {
       this.targets = null;
     }
   }
+
+  /**
+   * A simple result enum for the result of
+   * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
+   */
+  enum MisReplicationResult {
+    /** The block should be invalidated since it belongs to a deleted file. */
+    INVALID,
+    /** The block is currently under-replicated. */
+    UNDER_REPLICATED,
+    /** The block is currently over-replicated. */
+    OVER_REPLICATED,
+    /** A decision can't currently be made about this block. */
+    POSTPONE,
+    /** The block is properly replicated */
+    OK
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Dec 21 04:32:40 2011
@@ -60,7 +60,7 @@ public class BlockPlacementPolicyDefault
     initialize(conf, stats, clusterMap);
   }
 
-  BlockPlacementPolicyDefault() {
+  protected BlockPlacementPolicyDefault() {
   }
     
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Dec 21 04:32:40 2011
@@ -94,6 +94,10 @@ public class DatanodeDescriptor extends 
     boolean contains(E e) {
       return blockq.contains(e);
     }
+
+    synchronized void clear() {
+      blockq.clear();
+    }
   }
 
   private volatile BlockInfo blockList = null;
@@ -103,6 +107,24 @@ public class DatanodeDescriptor extends 
   public boolean isAlive = false;
   public boolean needKeyUpdate = false;
 
+  /**
+   * Set to false on any NN failover, and reset to true
+   * whenever a block report is received.
+   */
+  private boolean heartbeatedSinceFailover = false;
+  
+  /**
+   * At startup or at any failover, the DNs in the cluster may
+   * have pending block deletions from a previous incarnation
+   * of the NameNode. Thus, we consider their block contents
+   * stale until we have received a block report. When a DN
+   * is considered stale, any replicas on it are transitively
+   * considered stale. If any block has at least one stale replica,
+   * then no invalidations will be processed for this block.
+   * See HDFS-1972.
+   */
+  private boolean blockContentsStale = true;
+  
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
   // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -281,6 +303,14 @@ public class DatanodeDescriptor extends 
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
   }
+  
+  public void clearBlockQueues() {
+    synchronized (invalidateBlocks) {
+      this.invalidateBlocks.clear();
+      this.recoverBlocks.clear();
+      this.replicateBlocks.clear();
+    }
+  }
 
   public int numBlocks() {
     return numBlocks;
@@ -298,6 +328,7 @@ public class DatanodeDescriptor extends 
     this.lastUpdate = System.currentTimeMillis();
     this.xceiverCount = xceiverCount;
     this.volumeFailures = volFailures;
+    this.heartbeatedSinceFailover = true;
     rollBlocksScheduled(lastUpdate);
   }
 
@@ -564,5 +595,36 @@ public class DatanodeDescriptor extends 
     this.bandwidth = bandwidth;
   }
 
+  public boolean areBlockContentsStale() {
+    return blockContentsStale;
+  }
+
+  public void markStaleAfterFailover() {
+    heartbeatedSinceFailover = false;
+    blockContentsStale = true;
+  }
+
+  public void receivedBlockReport() {
+    if (heartbeatedSinceFailover) {
+      blockContentsStale = false;
+    }
+  }
 
+  @Override
+  public String dumpDatanode() {
+    StringBuilder sb = new StringBuilder(super.dumpDatanode());
+    int repl = replicateBlocks.size();
+    if (repl > 0) {
+      sb.append(" ").append(repl).append(" blocks to be replicated;");
+    }
+    int inval = invalidateBlocks.size();
+    if (inval > 0) {
+      sb.append(" ").append(inval).append(" blocks to be invalidated;");      
+    }
+    int recover = recoverBlocks.size();
+    if (recover > 0) {
+      sb.append(" ").append(recover).append(" blocks to be recovered;");
+    }
+    return sb.toString();
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Dec 21 04:32:40 2011
@@ -945,4 +945,27 @@ public class DatanodeManager {
       }
     }
   }
+  
+  public void markAllDatanodesStale() {
+    LOG.info("Marking all datandoes as stale");
+    synchronized (datanodeMap) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.markStaleAfterFailover();
+      }
+    }
+  }
+
+  /**
+   * Clear any actions that are queued up to be sent to the DNs
+   * on their next heartbeats. This includes block invalidations,
+   * recoveries, and replication requests.
+   */
+  public void clearPendingQueues() {
+    synchronized (datanodeMap) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.clearBlockQueues();
+      }
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Wed Dec 21 04:32:40 2011
@@ -160,4 +160,9 @@ class InvalidateBlocks {
     numBlocks -= toInvalidate.size();
     return toInvalidate;
   }
+  
+  synchronized void clear() {
+    node2blocks.clear();
+    numBlocks = 0;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java Wed Dec 21 04:32:40 2011
@@ -26,20 +26,22 @@ public class NumberReplicas {
   private int decommissionedReplicas;
   private int corruptReplicas;
   private int excessReplicas;
+  private int replicasOnStaleNodes;
 
   NumberReplicas() {
-    initialize(0, 0, 0, 0);
+    initialize(0, 0, 0, 0, 0);
   }
 
-  NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
-    initialize(live, decommissioned, corrupt, excess);
+  NumberReplicas(int live, int decommissioned, int corrupt, int excess, int stale) {
+    initialize(live, decommissioned, corrupt, excess, stale);
   }
 
-  void initialize(int live, int decommissioned, int corrupt, int excess) {
+  void initialize(int live, int decommissioned, int corrupt, int excess, int stale) {
     liveReplicas = live;
     decommissionedReplicas = decommissioned;
     corruptReplicas = corrupt;
     excessReplicas = excess;
+    replicasOnStaleNodes = stale;
   }
 
   public int liveReplicas() {
@@ -54,4 +56,13 @@ public class NumberReplicas {
   public int excessReplicas() {
     return excessReplicas;
   }
+  
+  /**
+   * @return the number of replicas which are on stale nodes.
+   * This is not mutually exclusive with the other counts -- ie a
+   * replica may count as both "live" and "stale".
+   */
+  public int replicasOnStaleNodes() {
+    return replicasOnStaleNodes;
+  }
 } 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Wed Dec 21 04:32:40 2011
@@ -104,6 +104,14 @@ class PendingReplicationBlocks {
     }
   }
 
+
+  public void clear() {
+    synchronized (pendingReplications) {
+      pendingReplications.clear();
+      timedOutItems.clear();
+    }
+  }
+
   /**
    * The total number of blocks that are undergoing replication
    */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Dec 21 04:32:40 2011
@@ -499,6 +499,16 @@ class BPOfferService {
   }
 
   /**
+   * Run an immediate deletion report on this thread. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerDeletionReportForTests() throws IOException {
+    for (BPServiceActor actor : bpServices) {
+      actor.triggerDeletionReportForTests();
+    }
+  }
+
+  /**
    * Run an immediate heartbeat from all actors. Used by tests.
    */
   @VisibleForTesting

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Dec 21 04:32:40 2011
@@ -281,8 +281,18 @@ class BPServiceActor implements Runnable
    */
   @VisibleForTesting
   void triggerBlockReportForTests() throws IOException {
+    synchronized (receivedAndDeletedBlockList) {
       lastBlockReport = 0;
-      blockReport();
+      lastHeartbeat = 0;
+      receivedAndDeletedBlockList.notifyAll();
+      while (lastBlockReport == 0) {
+        try {
+          receivedAndDeletedBlockList.wait(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
   }
   
   @VisibleForTesting
@@ -290,6 +300,29 @@ class BPServiceActor implements Runnable
     synchronized (receivedAndDeletedBlockList) {
       lastHeartbeat = 0;
       receivedAndDeletedBlockList.notifyAll();
+      while (lastHeartbeat == 0) {
+        try {
+          receivedAndDeletedBlockList.wait(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void triggerDeletionReportForTests() throws IOException {
+    synchronized (receivedAndDeletedBlockList) {
+      lastDeletedReport = 0;
+      receivedAndDeletedBlockList.notifyAll();
+
+      while (lastDeletedReport == 0) {
+        try {
+          receivedAndDeletedBlockList.wait(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
     }
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Wed Dec 21 04:32:40 2011
@@ -107,6 +107,14 @@ class FSDatasetAsyncDiskService {
     
   }
   
+  synchronized long countPendingDeletions() {
+    long count = 0;
+    for (ThreadPoolExecutor exec : executors.values()) {
+      count += exec.getTaskCount() - exec.getCompletedTaskCount();
+    }
+    return count;
+  }
+  
   /**
    * Execute the task sometime in the future, using ThreadPools.
    */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Dec 21 04:32:40 2011
@@ -71,6 +71,7 @@ import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
@@ -508,6 +509,17 @@ public class FSNamesystem implements Nam
             "taking over writer role in edits logs.");
         editLogTailer.catchupDuringFailover();
         
+        LOG.info("Reprocessing replication and invalidation queues...");
+        blockManager.getDatanodeManager().markAllDatanodesStale();
+        blockManager.clearQueues();
+        blockManager.processMisReplicatedBlocks();
+        
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NameNode metadata after re-processing " +
+              "replication and invalidation queues during failover:\n" +
+              metaSaveAsString());
+        }
+        
         long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
         LOG.info("Will take over writing edit logs at txnid " + 
             nextTxId);
@@ -523,7 +535,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
   }
-  
+
   /** 
    * Stop services required in active state
    * @throws InterruptedException
@@ -781,14 +793,7 @@ public class FSNamesystem implements Nam
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
           true)));
-  
-      long totalInodes = this.dir.totalInodes();
-      long totalBlocks = this.getBlocksTotal();
-      out.println(totalInodes + " files and directories, " + totalBlocks
-          + " blocks = " + (totalInodes + totalBlocks) + " total");
-
-      blockManager.metaSave(out);
-
+      metaSave(out);
       out.flush();
       out.close();
     } finally {
@@ -796,6 +801,25 @@ public class FSNamesystem implements Nam
     }
   }
 
+  private void metaSave(PrintWriter out) {
+    assert hasWriteLock();
+    long totalInodes = this.dir.totalInodes();
+    long totalBlocks = this.getBlocksTotal();
+    out.println(totalInodes + " files and directories, " + totalBlocks
+        + " blocks = " + (totalInodes + totalBlocks) + " total");
+
+    blockManager.metaSave(out);
+  }
+
+  private String metaSaveAsString() {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    metaSave(pw);
+    pw.flush();
+    return sw.toString();
+  }
+  
+
   long getDefaultBlockSize() {
     return serverDefaults.getBlockSize();
   }
@@ -3605,6 +3629,9 @@ public class FSNamesystem implements Nam
 
   @Override
   public boolean isPopulatingReplQueues() {
+    if (!haContext.getState().shouldPopulateReplQueues()) {
+      return false;
+    }
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -3939,6 +3966,11 @@ public class FSNamesystem implements Nam
   }
   
   @Metric
+  public long getPostponedMisreplicatedBlocks() {
+    return blockManager.getPostponedMisreplicatedBlocksCount();
+  }
+  
+  @Metric
   public int getBlockCapacity() {
     return blockManager.getCapacity();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java Wed Dec 21 04:32:40 2011
@@ -42,6 +42,11 @@ public class ActiveState extends HAState
   }
   
   @Override
+  public boolean shouldPopulateReplQueues() {
+    return true;
+  }
+  
+  @Override
   public void setState(HAContext context, HAState s) throws ServiceFailedException {
     if (s == NameNode.STANDBY_STATE) {
       setStateInternal(context, s);
@@ -67,4 +72,5 @@ public class ActiveState extends HAState
       throw new ServiceFailedException("Failed to stop active services", e);
     }
   }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java Wed Dec 21 04:32:40 2011
@@ -106,9 +106,12 @@ abstract public class HAState {
   public abstract void checkOperation(final HAContext context, final OperationCategory op)
       throws StandbyException;
 
+  public abstract boolean shouldPopulateReplQueues();
+
   /**
    * @return String representation of the service state.
    */
+  @Override
   public String toString() {
     return state.toString();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java Wed Dec 21 04:32:40 2011
@@ -80,5 +80,10 @@ public class StandbyState extends HAStat
         + context.getState();
     throw new StandbyException(msg);
   }
+
+  @Override
+  public boolean shouldPopulateReplQueues() {
+    return false;
+  }
 }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Dec 21 04:32:40 2011
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocolR2
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
@@ -1574,6 +1575,30 @@ public class MiniDFSCluster {
       ServiceFailedException {
     getHaServiceClient(nnIndex).transitionToStandby();
   }
+  
+  
+  public void triggerBlockReports()
+      throws IOException {
+    for (DataNode dn : getDataNodes()) {
+      DataNodeAdapter.triggerBlockReport(dn);
+    }
+  }
+
+
+  public void triggerDeletionReports()
+      throws IOException {
+    for (DataNode dn : getDataNodes()) {
+      DataNodeAdapter.triggerDeletionReport(dn);
+    }
+  }
+
+  public void triggerHeartbeats()
+      throws IOException {
+    for (DataNode dn : getDataNodes()) {
+      DataNodeAdapter.triggerHeartbeat(dn);
+    }
+  }
+
 
   /** Wait until the given namenode gets registration from all the datanodes */
   public void waitActive(int nnIndex) throws IOException {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Wed Dec 21 04:32:40 2011
@@ -122,4 +122,26 @@ public class BlockManagerTestUtil {
     return blockManager.computeDatanodeWork();
   }
   
+  public static int computeInvalidationWork(BlockManager bm) {
+    return bm.computeInvalidateWork(Integer.MAX_VALUE);
+  }
+  
+  /**
+   * Compute all the replication and invalidation work for the
+   * given BlockManager.
+   * 
+   * This differs from the above functions in that it computes
+   * replication work for all DNs rather than a particular subset,
+   * regardless of invalidation/replication limit configurations.
+   * 
+   * NB: you may want to set
+   * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
+   * a high value to ensure that all work is calculated.
+   */
+  public static int computeAllPendingWork(BlockManager bm)
+    throws IOException {
+    int work = computeInvalidationWork(bm);
+    work += bm.computeReplicationWork(Integer.MAX_VALUE);
+    return work;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java Wed Dec 21 04:32:40 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.IOException;
+
 /**
  * WARNING!! This is TEST ONLY class: it never has to be used
  * for ANY development purposes.
@@ -42,4 +44,27 @@ public class DataNodeAdapter {
       boolean heartbeatsDisabledForTests) {
     dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
   }
+
+  public static void triggerDeletionReport(DataNode dn) throws IOException {
+    for (BPOfferService bpos : dn.getAllBpOs()) {
+      bpos.triggerDeletionReportForTests();
+    }
+  }
+
+  public static void triggerHeartbeat(DataNode dn) throws IOException {
+    for (BPOfferService bpos : dn.getAllBpOs()) {
+      bpos.triggerHeartbeatForTests();
+    }
+  }
+  
+  public static void triggerBlockReport(DataNode dn) throws IOException {
+    for (BPOfferService bpos : dn.getAllBpOs()) {
+      bpos.triggerBlockReportForTests();
+    }
+  }
+
+  public static long getPendingAsyncDeletions(DataNode dn) {
+    FSDataset fsd = (FSDataset)dn.getFSDataset();
+    return fsd.asyncDiskService.countPendingDeletions();
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Dec 21 04:32:40 2011
@@ -81,6 +81,11 @@ public class NameNodeAdapter {
     namenode.getNamesystem().leaveSafeMode(checkForUpgrades);
   }
   
+  public static void abortEditLogs(NameNode nn) {
+    FSEditLog el = nn.getFSImage().getEditLog();
+    el.abortCurrentLogSegment();
+  }
+  
   /**
    * Get the internal RPC server instance.
    * @return rpc server

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java?rev=1221608&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java Wed Dec 21 04:32:40 2011
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+
+
+public class TestDNFencing {
+  
+  protected static final Log LOG = LogFactory.getLog(
+      TestDNFencing.class);
+  private static final String TEST_FILE_DATA = "hello highly available world";
+  private static final String TEST_FILE = "/testStandbyIsHot";
+  private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
+  private static final int SMALL_BLOCK = 1024;
+  
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private NameNode nn1, nn2;
+  private FileSystem fs;
+
+  static {
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+  }
+  
+  @Before
+  public void setupCluster() throws Exception {
+    conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK);
+    // Bump up replication interval so that we only run replication
+    // checks explicitly.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 600);
+    // Increase max streams so that we re-replicate quickly.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+    // See RandomDeleterPolicy javadoc.
+    conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
+        BlockPlacementPolicy.class); 
+    cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(3)
+      .build();
+    nn1 = cluster.getNameNode(0);
+    nn2 = cluster.getNameNode(1);
+    
+    cluster.waitActive();
+    cluster.transitionToActive(0);
+    // Trigger block reports so that the first NN trusts all
+    // of the DNs, and will issue deletions
+    cluster.triggerBlockReports();
+    nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
+    nn2.getNamesystem().getEditLogTailer().interrupt();
+    fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
+  }
+  
+  @After
+  public void shutdownCluster() throws Exception {
+    if (cluster != null) {
+      banner("Shutting down cluster. NN1 metadata:");
+      doMetasave(nn1);
+      banner("Shutting down cluster. NN2 metadata:");
+      doMetasave(nn2);
+      cluster.shutdown();
+    }
+  }
+  
+
+  @Test
+  public void testDnFencing() throws Exception {
+    // Create a file with replication level 3.
+    DFSTestUtil.createFile(fs, TEST_FILE_PATH, 30*SMALL_BLOCK, (short)3, 1L);
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
+    
+    // Drop its replication count to 1, so it becomes over-replicated.
+    // Then compute the invalidation of the extra blocks and trigger
+    // heartbeats so the invalidations are flushed to the DNs.
+    nn1.getRpcServer().setReplication(TEST_FILE, (short) 1);
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn1.getNamesystem().getBlockManager());
+    cluster.triggerHeartbeats();
+    
+    // Transition nn2 to active even though nn1 still thinks it's active.
+    banner("Failing to NN2 but let NN1 continue to think it's active");
+    NameNodeAdapter.abortEditLogs(nn1);
+    NameNodeAdapter.enterSafeMode(nn1, false);
+    cluster.transitionToActive(1);
+    
+    // Check that the standby picked up the replication change.
+    assertEquals(1,
+        nn2.getRpcServer().getFileInfo(TEST_FILE).getReplication());
+
+    // Dump some info for debugging purposes.
+    banner("NN2 Metadata immediately after failover");
+    doMetasave(nn2);
+    
+    // Even though NN2 considers the blocks over-replicated, it should
+    // post-pone the block invalidation because the DNs are still "stale".
+    assertEquals(30, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+    
+    banner("Triggering heartbeats and block reports so that fencing is completed");
+    cluster.triggerHeartbeats();
+    cluster.triggerBlockReports();
+    
+    banner("Metadata after nodes have all block-reported");
+    doMetasave(nn2);
+    
+    // The blocks should no longer be postponed.
+    assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+    
+    // Wait for NN2 to enact its deletions (replication monitor has to run, etc)
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn2.getNamesystem().getBlockManager());
+    cluster.triggerHeartbeats();
+    waitForDNDeletions(cluster);
+    cluster.triggerDeletionReports();
+    assertEquals(0, nn2.getNamesystem().getUnderReplicatedBlocks());
+    assertEquals(0, nn2.getNamesystem().getPendingReplicationBlocks());
+    
+    banner("Making sure the file is still readable");
+    FileSystem fs2 = cluster.getFileSystem(1);
+    DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
+
+    banner("Waiting for the actual block files to get deleted from DNs.");
+    waitForTrueReplication(cluster, block, 1);
+  }
+  
+  /**
+   * Test case which restarts the standby node in such a way that,
+   * when it exits safemode, it will want to invalidate a bunch
+   * of over-replicated block replicas. Ensures that if we failover
+   * at this point it won't lose data.
+   */
+  @Test
+  public void testNNClearsCommandsOnFailoverAfterStartup()
+      throws Exception {
+    // Make lots of blocks to increase chances of triggering a bug.
+    DFSTestUtil.createFile(fs, TEST_FILE_PATH, 30*SMALL_BLOCK, (short)3, 1L);
+
+    banner("Shutting down NN2");
+    cluster.shutdownNameNode(1);
+
+    banner("Setting replication to 1, rolling edit log.");
+    nn1.getRpcServer().setReplication(TEST_FILE, (short) 1);
+    nn1.getRpcServer().rollEditLog();
+    
+    // Start NN2 again. When it starts up, it will see all of the
+    // blocks as over-replicated, since it has the metadata for
+    // replication=1, but the DNs haven't yet processed the deletions.
+    banner("Starting NN2 again.");
+    cluster.restartNameNode(1);
+    nn2 = cluster.getNameNode(1);
+    
+    banner("triggering BRs");
+    cluster.triggerBlockReports();
+
+    // We expect that both NN1 and NN2 will have some number of
+    // deletions queued up for the DNs.
+    banner("computing invalidation on nn1");
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn1.getNamesystem().getBlockManager());
+
+    banner("computing invalidation on nn2");
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn2.getNamesystem().getBlockManager());
+    
+    // Dump some info for debugging purposes.
+    banner("Metadata immediately before failover");
+    doMetasave(nn2);
+
+
+    // Transition nn2 to active even though nn1 still thinks it's active
+    banner("Failing to NN2 but let NN1 continue to think it's active");
+    NameNodeAdapter.abortEditLogs(nn1);
+    NameNodeAdapter.enterSafeMode(nn1, false);
+
+    cluster.transitionToActive(1);
+
+    // Check that the standby picked up the replication change.
+    assertEquals(1,
+        nn2.getRpcServer().getFileInfo(TEST_FILE).getReplication());
+
+    // Dump some info for debugging purposes.
+    banner("Metadata immediately after failover");
+    doMetasave(nn2);
+    
+    banner("Triggering heartbeats and block reports so that fencing is completed");
+    cluster.triggerHeartbeats();
+    cluster.triggerBlockReports();
+    
+    banner("Metadata after nodes have all block-reported");
+    doMetasave(nn2);
+    
+    // The block should no longer be postponed.
+    assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+    
+    // Wait for NN2 to enact its deletions (replication monitor has to run, etc)
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn2.getNamesystem().getBlockManager());
+
+    waitForNNToIssueDeletions(nn2);
+    cluster.triggerHeartbeats();
+    waitForDNDeletions(cluster);
+    cluster.triggerDeletionReports();
+    assertEquals(0, nn2.getNamesystem().getUnderReplicatedBlocks());
+    assertEquals(0, nn2.getNamesystem().getPendingReplicationBlocks());
+    
+    banner("Making sure the file is still readable");
+    FileSystem fs2 = cluster.getFileSystem(1);
+    DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
+  }
+  
+  /**
+   * Test case that reduces replication of a file with a lot of blocks
+   * and then fails over right after those blocks enter the DN invalidation
+   * queues on the active. Ensures that fencing is correct and no replicas
+   * are lost.
+   */
+  @Test
+  public void testNNClearsCommandsOnFailoverWithReplChanges()
+      throws Exception {
+    // Make lots of blocks to increase chances of triggering a bug.
+    DFSTestUtil.createFile(fs, TEST_FILE_PATH, 30*SMALL_BLOCK, (short)1, 1L);
+
+    banner("rolling NN1's edit log, forcing catch-up");
+    TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2);
+    
+    // Get some new replicas reported so that NN2 now considers
+    // them over-replicated and schedules some more deletions
+    nn1.getRpcServer().setReplication(TEST_FILE, (short) 2);
+    while (BlockManagerTestUtil.getComputedDatanodeWork(
+        nn1.getNamesystem().getBlockManager()) > 0) {
+      LOG.info("Getting more replication work computed");
+    }
+    BlockManager bm1 = nn1.getNamesystem().getBlockManager();
+    while (bm1.getPendingReplicationBlocksCount() > 0) {
+      BlockManagerTestUtil.updateState(bm1);
+      cluster.triggerHeartbeats();
+      Thread.sleep(1000);
+    }
+    
+    banner("triggering BRs");
+    cluster.triggerBlockReports();
+    
+    nn1.getRpcServer().setReplication(TEST_FILE, (short) 1);
+
+    
+    banner("computing invalidation on nn1");
+
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn1.getNamesystem().getBlockManager());
+    doMetasave(nn1);
+
+    banner("computing invalidation on nn2");
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn2.getNamesystem().getBlockManager());
+    doMetasave(nn2);
+
+    // Dump some info for debugging purposes.
+    banner("Metadata immediately before failover");
+    doMetasave(nn2);
+
+
+    // Transition nn2 to active even though nn1 still thinks it's active
+    banner("Failing to NN2 but let NN1 continue to think it's active");
+    NameNodeAdapter.abortEditLogs(nn1);
+    NameNodeAdapter.enterSafeMode(nn1, false);
+
+    
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn2.getNamesystem().getBlockManager());
+    cluster.transitionToActive(1);
+
+    // Check that the standby picked up the replication change.
+    assertEquals(1,
+        nn2.getRpcServer().getFileInfo(TEST_FILE).getReplication());
+
+    // Dump some info for debugging purposes.
+    banner("Metadata immediately after failover");
+    doMetasave(nn2);
+    
+    banner("Triggering heartbeats and block reports so that fencing is completed");
+    cluster.triggerHeartbeats();
+    cluster.triggerBlockReports();
+    
+    banner("Metadata after nodes have all block-reported");
+    doMetasave(nn2);
+    
+    // The block should no longer be postponed.
+    assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+    
+    // Wait for NN2 to enact its deletions (replication monitor has to run, etc)
+    BlockManagerTestUtil.computeInvalidationWork(
+        nn2.getNamesystem().getBlockManager());
+
+    waitForNNToIssueDeletions(nn2);
+    cluster.triggerHeartbeats();
+    waitForDNDeletions(cluster);
+    cluster.triggerDeletionReports();
+    assertEquals(0, nn2.getNamesystem().getUnderReplicatedBlocks());
+    assertEquals(0, nn2.getNamesystem().getPendingReplicationBlocks());
+    
+    banner("Making sure the file is still readable");
+    FileSystem fs2 = cluster.getFileSystem(1);
+    DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
+  }
+
+  /**
+   * Print a big banner in the test log to make debug easier.
+   */
+  private void banner(String string) {
+    LOG.info("\n\n\n\n================================================\n" +
+        string + "\n" +
+        "==================================================\n\n");
+  }
+
+  private void doMetasave(NameNode nn2) {
+    nn2.getNamesystem().writeLock();
+    try {
+      PrintWriter pw = new PrintWriter(System.err);
+      nn2.getNamesystem().getBlockManager().metaSave(pw);
+      pw.flush();
+    } finally {
+      nn2.getNamesystem().writeUnlock();
+    }
+  }
+
+  private void waitForTrueReplication(final MiniDFSCluster cluster,
+      final ExtendedBlock block, final int waitFor) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return getTrueReplication(cluster, block) == waitFor;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }, 500, 10000);
+  }
+
+  private int getTrueReplication(MiniDFSCluster cluster, ExtendedBlock block)
+      throws IOException {
+    int count = 0;
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getFSDataset().getStoredBlock(block.getBlockPoolId(), block.getBlockId()) != null) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  private void waitForDNDeletions(final MiniDFSCluster cluster)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        for (DataNode dn : cluster.getDataNodes()) {
+          if (DataNodeAdapter.getPendingAsyncDeletions(dn) > 0) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }, 1000, 10000);
+    
+  }
+
+  private void waitForNNToIssueDeletions(final NameNode nn)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("Waiting for NN to issue block deletions to DNs");
+        return nn.getNamesystem().getBlockManager().getPendingDeletionBlocksCount() == 0;
+      }
+    }, 250, 10000);
+  }
+
+  /**
+   * A BlockPlacementPolicy which, rather than using space available, makes
+   * random decisions about which excess replica to delete. This is because,
+   * in the test cases, the two NNs will usually (but not quite always)
+   * make the same decision of which replica to delete. The fencing issues
+   * are exacerbated when the two NNs make different decisions, which can
+   * happen in "real life" when they have slightly out-of-sync heartbeat
+   * information regarding disk usage.
+   */
+  public static class RandomDeleterPolicy extends BlockPlacementPolicyDefault {
+
+    public RandomDeleterPolicy() {
+      super();
+    }
+
+    @Override
+    public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+        Block block, short replicationFactor,
+        Collection<DatanodeDescriptor> first,
+        Collection<DatanodeDescriptor> second) {
+      
+      Collection<DatanodeDescriptor> chooseFrom =
+        !first.isEmpty() ? first : second;
+
+      List<DatanodeDescriptor> l = Lists.newArrayList(chooseFrom);
+      return l.get(DFSUtil.getRandom().nextInt(l.size()));
+    }
+  }
+
+}

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java?rev=1221608&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java Wed Dec 21 04:32:40 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+
+/**
+ * Stress-test for potential bugs when replication is changing
+ * on blocks during a failover.
+ */
+public class TestDNFencingWithReplication {
+  static {
+    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
+    ((Log4JLogger)Server.LOG).getLogger().setLevel(Level.FATAL);
+    ((Log4JLogger)LogFactory.getLog(
+        "org.apache.hadoop.io.retry.RetryInvocationHandler"))
+        .getLogger().setLevel(Level.FATAL);
+  }
+
+  private static final int NUM_THREADS = 20;
+  // How long should the test try to run for. In practice
+  // it runs for ~20-30s longer than this constant due to startup/
+  // shutdown time.
+  private static final long RUNTIME = 35000;
+  private static final int BLOCK_SIZE = 1024;
+  
+  private static class ReplicationToggler extends RepeatingTestThread {
+    private final FileSystem fs;
+    private final Path path;
+
+    public ReplicationToggler(TestContext ctx, FileSystem fs, Path p) {
+      super(ctx);
+      this.fs = fs;
+      this.path = p;
+    }
+
+    @Override
+    public void doAnAction() throws Exception {
+      fs.setReplication(path, (short)1);
+      waitForReplicas(1);
+      fs.setReplication(path, (short)2);
+      waitForReplicas(2);
+    }
+    
+    private void waitForReplicas(final int replicas) throws Exception {
+      try {
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            try {
+              BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, 10);
+              Assert.assertEquals(1, blocks.length);
+              return blocks[0].getHosts().length == replicas;
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }, 100, 60000);
+      } catch (TimeoutException te) {
+        throw new IOException("Timed out waiting for " + replicas + " replicas " +
+            "on path " + path);
+      }
+    }
+    
+    public String toString() {
+      return "Toggler for " + path;
+    }
+  }
+  
+  @Test
+  public void testFencingStress() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    // Increase max streams so that we re-replicate quickly.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(3)
+      .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      
+      final NameNode nn1 = cluster.getNameNode(0);
+      final NameNode nn2 = cluster.getNameNode(1);
+      nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
+      nn2.getNamesystem().getEditLogTailer().interrupt();
+      
+      FileSystem fs = TestDFSClientFailover.configureFailoverFs(
+          cluster, conf);
+      TestContext togglers = new TestContext();
+      for (int i = 0; i < NUM_THREADS; i++) {
+        Path p = new Path("/test-" + i);
+        DFSTestUtil.createFile(fs, p, BLOCK_SIZE*10, (short)3, (long)i);
+        togglers.addThread(new ReplicationToggler(togglers, fs, p));
+      }
+      
+      // Start a separate thread which will make sure that replication
+      // happens quickly by triggering deletion reports and replication
+      // work calculation frequently.
+      TestContext triggerCtx = new TestContext();
+      triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
+        
+        @Override
+        public void doAnAction() throws Exception {
+          for (DataNode dn : cluster.getDataNodes()) {
+            DataNodeAdapter.triggerDeletionReport(dn);
+            DataNodeAdapter.triggerHeartbeat(dn);
+          }
+          for (int i = 0; i < 2; i++) {
+            NameNode nn = cluster.getNameNode(i);
+            BlockManagerTestUtil.computeAllPendingWork(
+                nn.getNamesystem().getBlockManager());
+          }
+          Thread.sleep(500);
+        }
+      });
+      
+      triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
+        
+        @Override
+        public void doAnAction() throws Exception {
+          System.err.println("==============================\n" +
+              "Failing over from 0->1\n" +
+              "==================================");
+          cluster.transitionToStandby(0);
+          cluster.transitionToActive(1);
+          
+          Thread.sleep(5000);
+          System.err.println("==============================\n" +
+              "Failing over from 1->0\n" +
+              "==================================");
+
+          cluster.transitionToStandby(1);
+          cluster.transitionToActive(0);
+          Thread.sleep(5000);
+        }
+      });
+      
+      triggerCtx.startThreads();
+      togglers.startThreads();
+      
+      togglers.waitFor(RUNTIME);
+      togglers.stop();
+      triggerCtx.stop();
+
+      // CHeck that the files can be read without throwing
+      for (int i = 0; i < NUM_THREADS; i++) {
+        Path p = new Path("/test-" + i);
+        DFSTestUtil.readFile(fs, p);
+      }
+    } finally {
+      System.err.println("===========================\n\n\n\n");
+      cluster.shutdown();
+    }
+
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java Wed Dec 21 04:32:40 2011
@@ -108,8 +108,7 @@ public class TestEditLogTailer {
     long activeTxId = active.getNamesystem().getFSImage().getEditLog()
       .getLastWrittenTxId();
     
-    // TODO: we should really just ask for a log roll here
-    doSaveNamespace(active);
+    active.getRpcServer().rollEditLog();
     
     long start = System.currentTimeMillis();
     while (System.currentTimeMillis() - start < NN_LAG_TIMEOUT) {
@@ -124,12 +123,4 @@ public class TestEditLogTailer {
         " (currently at " +
         standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
   }
-  
-  private static void doSaveNamespace(NameNode nn)
-      throws IOException {
-    NameNodeAdapter.enterSafeMode(nn, false);
-    NameNodeAdapter.saveNamespace(nn);
-    NameNodeAdapter.leaveSafeMode(nn, false);
-  }
-  
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java Wed Dec 21 04:32:40 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,10 +34,17 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.base.Supplier;
@@ -52,6 +60,12 @@ public class TestStandbyIsHot {
   private static final String TEST_FILE = "/testStandbyIsHot";
   private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
 
+  static {
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+  }
+
   @Test
   public void testStandbyIsHot() throws Exception {
     Configuration conf = new Configuration();
@@ -79,19 +93,40 @@ public class TestStandbyIsHot {
       nn1.getRpcServer().rollEditLog();
       System.err.println("==================================");
 
-      waitForBlockLocations(nn2, TEST_FILE, 3);
-      
-      nn1.stop();
-      cluster.transitionToActive(1);
+      // Block locations should show up on standby.
+      LOG.info("Waiting for block locations to appear on standby node");
+      waitForBlockLocations(cluster, nn2, TEST_FILE, 3);
+
+      // Trigger immediate heartbeats and block reports so
+      // that the active "trusts" all of the DNs
+      cluster.triggerHeartbeats();
+      cluster.triggerBlockReports();
+
+      // Change replication
+      LOG.info("Changing replication to 1");
+      fs.setReplication(TEST_FILE_PATH, (short)1);
+      waitForBlockLocations(cluster, nn1, TEST_FILE, 1);
 
-      assertEquals(TEST_FILE_DATA, DFSTestUtil.readFile(fs, TEST_FILE_PATH));
+      nn1.getRpcServer().rollEditLog();
+      
+      LOG.info("Waiting for lowered replication to show up on standby");
+      waitForBlockLocations(cluster, nn2, TEST_FILE, 1);
+      
+      // Change back to 3
+      LOG.info("Changing replication to 3");
+      fs.setReplication(TEST_FILE_PATH, (short)3);
+      nn1.getRpcServer().rollEditLog();
+      
+      LOG.info("Waiting for higher replication to show up on standby");
+      waitForBlockLocations(cluster, nn2, TEST_FILE, 3);
       
     } finally {
       cluster.shutdown();
     }
   }
 
-  private void waitForBlockLocations(final NameNode nn,
+  static void waitForBlockLocations(final MiniDFSCluster cluster,
+      final NameNode nn,
       final String path, final int expectedReplicas)
       throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
@@ -100,8 +135,19 @@ public class TestStandbyIsHot {
       public Boolean get() {
         try {
           LocatedBlocks locs = NameNodeAdapter.getBlockLocations(nn, path, 0, 1000);
-          LOG.info("Got locs: " + locs);
-          return locs.getLastLocatedBlock().getLocations().length == expectedReplicas;
+          DatanodeInfo[] dnis = locs.getLastLocatedBlock().getLocations();
+          for (DatanodeInfo dni : dnis) {
+            Assert.assertNotNull(dni);
+          }
+          int numReplicas = dnis.length;
+          
+          LOG.info("Got " + numReplicas + " locs: " + locs);
+          if (numReplicas > expectedReplicas) {
+            for (DataNode dn : cluster.getDataNodes()) {
+              DataNodeAdapter.triggerDeletionReport(dn);
+            }
+          }
+          return numReplicas == expectedReplicas;
         } catch (IOException e) {
           LOG.warn("No block locations yet: " + e.getMessage());
           return false;