You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2012/02/01 06:16:49 UTC

svn commit: r1238940 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/serv...

Author: eli
Date: Wed Feb  1 05:16:49 2012
New Revision: 1238940

URL: http://svn.apache.org/viewvc?rev=1238940&view=rev
Log:
HDFS-2742. HA: observed dataloss in replication stress test. Contributed by Todd Lipcon

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
Removed:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Feb  1 05:16:49 2012
@@ -145,3 +145,5 @@ HDFS-2824. Fix failover when prior NN di
 HDFS-2853. HA: NN fails to start if the shared edits dir is marked required (atm via eli)
 
 HDFS-2845. SBN should not allow browsing of the file system via web UI. (Bikas Saha via atm)
+
+HDFS-2742. HA: observed dataloss in replication stress test. (todd via eli)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Feb  1 05:16:49 2012
@@ -180,7 +180,7 @@ public class BlockInfo extends Block imp
   /**
    * Count the number of data-nodes the block belongs to.
    */
-  int numNodes() {
+  public int numNodes() {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert triplets.length % 3 == 0 : "Malformed BlockInfo";
     for(int idx = getCapacity()-1; idx >= 0; idx--) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Feb  1 05:16:49 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,6 +29,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.commons.logging.Log;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.U
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Util;
@@ -58,7 +61,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -69,7 +71,6 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 
 /**
@@ -83,11 +84,20 @@ public class BlockManager {
   /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
 
+  private static final String QUEUE_REASON_CORRUPT_STATE =
+    "it has the wrong state or generation stamp";
+
+  private static final String QUEUE_REASON_FUTURE_GENSTAMP =
+    "generation stamp is in the future";
+
   private final Namesystem namesystem;
 
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
+  
+  private final PendingDataNodeMessages pendingDNMessages =
+    new PendingDataNodeMessages();
 
   private volatile long pendingReplicationBlocksCount = 0L;
   private volatile long corruptReplicaBlocksCount = 0L;
@@ -124,6 +134,10 @@ public class BlockManager {
   public long getPostponedMisreplicatedBlocksCount() {
     return postponedMisreplicatedBlocksCount;
   }
+  /** Used by metrics */
+  public int getPendingDataNodeMessageCount() {
+    return pendingDNMessages.count();
+  }
 
   /**replicationRecheckInterval is how often namenode checks for new replication work*/
   private final long replicationRecheckInterval;
@@ -479,12 +493,24 @@ public class BlockManager {
     if(curBlock.isComplete())
       return curBlock;
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
-    if (!force && ucBlock.numNodes() < minReplication)
+    int numNodes = ucBlock.numNodes();
+    if (!force && numNodes < minReplication)
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
     BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
     // replace penultimate block in file
     fileINode.setBlock(blkIndex, completeBlock);
+    
+    // Since safe-mode only counts complete blocks, and we now have
+    // one more complete block, we need to adjust the total up, and
+    // also count it as safe, if we have at least the minimum replica
+    // count. (We may not have the minimum replica count yet if this is
+    // a "forced" completion when a file is getting closed by an
+    // OP_CLOSE edit on the standby).
+    namesystem.adjustSafeModeBlockTotals(0, 1);
+    namesystem.incrementSafeBlockCount(
+        Math.min(numNodes, minReplication));
+    
     // replace block in the blocksMap
     return blocksMap.replaceBlock(completeBlock);
   }
@@ -547,6 +573,14 @@ public class BlockManager {
       String datanodeId = dd.getStorageID();
       invalidateBlocks.remove(datanodeId, oldBlock);
     }
+    
+    // Adjust safe-mode totals, since under-construction blocks don't
+    // count in safe-mode.
+    namesystem.adjustSafeModeBlockTotals(
+        // decrement safe if we had enough
+        targets.length >= minReplication ? -1 : 0,
+        // always decrement total blocks
+        -1);
 
     final long fileLength = fileINode.computeContentSummary().getLength();
     final long pos = fileLength - ucBlock.getNumBytes();
@@ -1483,9 +1517,19 @@ public class BlockManager {
     assert (node.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
+    boolean isStandby = namesystem.isInStandbyState();
+    
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState reportedState = itBR.getCurrentReplicaState();
+      
+      if (isStandby &&
+          namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
+        queueReportedBlock(node, iblk, reportedState,
+            QUEUE_REASON_FUTURE_GENSTAMP);
+        continue;
+      }
+      
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
       // If block does not belong to any file, we are done.
       if (storedBlock == null) continue;
@@ -1493,7 +1537,14 @@ public class BlockManager {
       // If block is corrupt, mark it and continue to next block.
       BlockUCState ucState = storedBlock.getBlockUCState();
       if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
-        markBlockAsCorrupt(storedBlock, node);
+        if (namesystem.isInStandbyState()) {
+          // In the Standby, we may receive a block report for a file that we
+          // just have an out-of-date gen-stamp or state for, for example.
+          queueReportedBlock(node, iblk, reportedState,
+              QUEUE_REASON_CORRUPT_STATE);
+        } else {
+          markBlockAsCorrupt(storedBlock, node);
+        }
         continue;
       }
       
@@ -1576,7 +1627,8 @@ public class BlockManager {
    * @param toCorrupt replicas with unexpected length or generation stamp;
    *        add to corrupt replicas
    * @param toUC replicas of blocks currently under construction
-   * @return
+   * @return the up-to-date stored block, if it should be kept.
+   *         Otherwise, null.
    */
   private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
       final Block block, final ReplicaState reportedState, 
@@ -1591,6 +1643,13 @@ public class BlockManager {
           + " replicaState = " + reportedState);
     }
   
+    if (namesystem.isInStandbyState() &&
+        namesystem.isGenStampInFuture(block.getGenerationStamp())) {
+      queueReportedBlock(dn, block, reportedState,
+          QUEUE_REASON_FUTURE_GENSTAMP);
+      return null;
+    }
+    
     // find block by blockId
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if(storedBlock == null) {
@@ -1615,7 +1674,16 @@ assert storedBlock.findDatanode(dn) < 0 
     }
 
     if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
-      toCorrupt.add(storedBlock);
+      if (namesystem.isInStandbyState()) {
+        // If the block is an out-of-date generation stamp or state,
+        // but we're the standby, we shouldn't treat it as corrupt,
+        // but instead just queue it for later processing.
+        queueReportedBlock(dn, storedBlock, reportedState,
+            QUEUE_REASON_CORRUPT_STATE);
+
+      } else {
+        toCorrupt.add(storedBlock);
+      }
       return storedBlock;
     }
 
@@ -1633,6 +1701,68 @@ assert storedBlock.findDatanode(dn) < 0 
     return storedBlock;
   }
 
+  /**
+   * Queue the given reported block for later processing in the
+   * standby node. {@see PendingDataNodeMessages}.
+   * @param reason a textual reason to report in the debug logs
+   */
+  private void queueReportedBlock(DatanodeDescriptor dn, Block block,
+      ReplicaState reportedState, String reason) {
+    assert namesystem.isInStandbyState();
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Queueing reported block " + block +
+          " in state " + reportedState + 
+          " from datanode " + dn + " for later processing " +
+          "because " + reason + ".");
+    }
+    pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
+  }
+
+  /**
+   * Try to process any messages that were previously queued for the given
+   * block. This is called from FSEditLogLoader whenever a block's state
+   * in the namespace has changed or a new block has been created.
+   */
+  public void processQueuedMessagesForBlock(Block b) throws IOException {
+    Queue<ReportedBlockInfo> queue = pendingDNMessages.takeBlockQueue(b);
+    if (queue == null) {
+      // Nothing to re-process
+      return;
+    }
+    processQueuedMessages(queue);
+  }
+  
+  private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
+      throws IOException {
+    for (ReportedBlockInfo rbi : rbis) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing previouly queued message " + rbi);
+      }
+      processAndHandleReportedBlock(
+          rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
+    }
+  }
+  
+  /**
+   * Process any remaining queued datanode messages after entering
+   * active state. At this point they will not be re-queued since
+   * we are the definitive master node and thus should be up-to-date
+   * with the namespace information.
+   */
+  public void processAllPendingDNMessages() throws IOException {
+    assert !namesystem.isInStandbyState() :
+      "processAllPendingDNMessages() should be called after exiting " +
+      "standby state!";
+    int count = pendingDNMessages.count();
+    if (count > 0) {
+      LOG.info("Processing " + count + " messages from DataNodes " +
+          "that were previously queued during standby state.");
+    }
+    processQueuedMessages(pendingDNMessages.takeAll());
+    assert pendingDNMessages.count() == 0;
+  }
+
   /*
    * The next two methods test the various cases under which we must conclude
    * the replica is corrupt, or under construction.  These are laid out
@@ -1742,13 +1872,15 @@ assert storedBlock.findDatanode(dn) < 0 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
     if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
-        && numCurrentReplica >= minReplication)
+        && numCurrentReplica >= minReplication) {
       storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false);
-
-    // check whether safe replication is reached for the block
-    // only complete blocks are counted towards that
-    if(storedBlock.isComplete())
+    } else if (storedBlock.isComplete()) {
+      // check whether safe replication is reached for the block
+      // only complete blocks are counted towards that.
+      // In the case that the block just became complete above, completeBlock()
+      // handles the safe block count maintenance.
       namesystem.incrementSafeBlockCount(numCurrentReplica);
+    }
   }
 
   /**
@@ -1807,15 +1939,17 @@ assert storedBlock.findDatanode(dn) < 0 
       + pendingReplications.getNumReplicas(storedBlock);
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
-        numLiveReplicas >= minReplication)
+        numLiveReplicas >= minReplication) {
       storedBlock = completeBlock(fileINode, storedBlock, false);
-
-    // check whether safe replication is reached for the block
-    // only complete blocks are counted towards that
-    // Is no-op if not in safe mode.
-    if(storedBlock.isComplete())
+    } else if (storedBlock.isComplete()) {
+      // check whether safe replication is reached for the block
+      // only complete blocks are counted towards that
+      // Is no-op if not in safe mode.
+      // In the case that the block just became complete above, completeBlock()
+      // handles the safe block count maintenance.
       namesystem.incrementSafeBlockCount(numCurrentReplica);
-
+    }
+    
     // if file is under construction, then done for now
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
@@ -2514,7 +2648,7 @@ assert storedBlock.findDatanode(dn) < 0 
   }
 
   public int getActiveBlockCount() {
-    return blocksMap.size() - (int)invalidateBlocks.numBlocks();
+    return blocksMap.size();
   }
 
   public DatanodeDescriptor[] getNodes(BlockInfo block) {

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1238940&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Wed Feb  1 05:16:49 2012
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * In the Standby Node, we can receive messages about blocks
+ * before they are actually available in the namespace, or while
+ * they have an outdated state in the namespace. In those cases,
+ * we queue those block-related messages in this structure.
+ * */  
+class PendingDataNodeMessages {
+  
+  Map<Block, Queue<ReportedBlockInfo>> queueByBlockId =
+    Maps.newHashMap();
+  private int count = 0;
+  
+    
+  static class ReportedBlockInfo {
+    private final Block block;
+    private final DatanodeDescriptor dn;
+    private final ReplicaState reportedState;
+
+    ReportedBlockInfo(DatanodeDescriptor dn, Block block,
+        ReplicaState reportedState) {
+      this.dn = dn;
+      this.block = block;
+      this.reportedState = reportedState;
+    }
+
+    Block getBlock() {
+      return block;
+    }
+
+    DatanodeDescriptor getNode() {
+      return dn;
+    }
+
+    ReplicaState getReportedState() {
+      return reportedState;
+    }
+
+    @Override
+    public String toString() {
+      return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+          + ", reportedState=" + reportedState + "]";
+    }
+  }
+  
+  void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
+      ReplicaState reportedState) {
+    block = new Block(block);
+    getBlockQueue(block).add(
+        new ReportedBlockInfo(dn, block, reportedState));
+    count++;
+  }
+  
+  /**
+   * @return any messages that were previously queued for the given block,
+   * or null if no messages were queued.
+   */
+  Queue<ReportedBlockInfo> takeBlockQueue(Block block) {
+    Queue<ReportedBlockInfo> queue = queueByBlockId.remove(block);
+    if (queue != null) {
+      count -= queue.size();
+    }
+    return queue;
+  }
+
+
+  private Queue<ReportedBlockInfo> getBlockQueue(Block block) {
+    Queue<ReportedBlockInfo> queue = queueByBlockId.get(block);
+    if (queue == null) {
+      queue = Lists.newLinkedList();
+      queueByBlockId.put(block, queue);
+    }
+    return queue;
+  }
+  
+  public int count() {
+    return count ;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
+      queueByBlockId.entrySet()) {
+      sb.append("Block " + entry.getKey() + ":\n");
+      for (ReportedBlockInfo rbi : entry.getValue()) {
+        sb.append("  ").append(rbi).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+
+  public Iterable<ReportedBlockInfo> takeAll() {
+    List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity(
+        count);
+    for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) {
+      rbis.addAll(q);
+    }
+    queueByBlockId.clear();
+    count = 0;
+    return rbis;
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Feb  1 05:16:49 2012
@@ -66,7 +66,6 @@ import com.google.common.base.Joiner;
 @InterfaceStability.Evolving
 public class FSEditLogLoader {
   private final FSNamesystem fsNamesys;
-  private long maxGenStamp = 0;
 
   public FSEditLogLoader(FSNamesystem fsNamesys) {
     this.fsNamesys = fsNamesys;
@@ -91,15 +90,6 @@ public class FSEditLogLoader {
           + " of size " + edits.length() + " edits # " + numEdits 
           + " loaded in " + (now()-startTime)/1000 + " seconds.");
     } finally {
-      fsNamesys.setBlockTotal();
-      
-      // Delay the notification of genstamp updates until after
-      // setBlockTotal() above. Otherwise, we will mark blocks
-      // as "safe" before they've been incorporated in the expected
-      // totalBlocks and threshold for SafeMode -- triggering an
-      // assertion failure and/or exiting safemode too early!
-      fsNamesys.notifyGenStampUpdate(maxGenStamp);
-      
       edits.close();
       fsNamesys.writeUnlock();
     }
@@ -183,6 +173,12 @@ public class FSEditLogLoader {
     switch (op.opCode) {
     case OP_ADD: {
       AddCloseOp addCloseOp = (AddCloseOp)op;
+      if (FSNamesystem.LOG.isDebugEnabled()) {
+        FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+            " numblocks : " + addCloseOp.blocks.length +
+            " clientHolder " + addCloseOp.clientName +
+            " clientMachine " + addCloseOp.clientMachine);
+      }
 
       // See if the file already exists (persistBlocks call)
       INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
@@ -197,13 +193,6 @@ public class FSEditLogLoader {
         }
         long blockSize = addCloseOp.blockSize;
         
-        if (FSNamesystem.LOG.isDebugEnabled()) {
-          FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
-              " numblocks : " + addCloseOp.blocks.length +
-              " clientHolder " + addCloseOp.clientName +
-              " clientMachine " + addCloseOp.clientMachine);
-        }
-
         // Older versions of HDFS does not store the block size in inode.
         // If the file has more than one block, use the size of the
         // first block as the blocksize. Otherwise use the default
@@ -227,12 +216,18 @@ public class FSEditLogLoader {
             addCloseOp.atime, blockSize);
 
         fsNamesys.prepareFileForWrite(addCloseOp.path, node,
-            addCloseOp.clientName, addCloseOp.clientMachine, null);
+            addCloseOp.clientName, addCloseOp.clientMachine, null,
+            false);
       } else { // This is OP_ADD on an existing file
         if (!oldFile.isUnderConstruction()) {
           // This is a call to append() on an already-closed file.
+          if (FSNamesystem.LOG.isDebugEnabled()) {
+            FSNamesystem.LOG.debug("Reopening an already-closed file " +
+                "for append");
+          }
           fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
-              addCloseOp.clientName, addCloseOp.clientMachine, null);
+              addCloseOp.clientName, addCloseOp.clientMachine, null,
+              false);
           oldFile = getINodeFile(fsDir, addCloseOp.path);
         }
         
@@ -243,6 +238,13 @@ public class FSEditLogLoader {
     case OP_CLOSE: {
       AddCloseOp addCloseOp = (AddCloseOp)op;
       
+      if (FSNamesystem.LOG.isDebugEnabled()) {
+        FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+            " numblocks : " + addCloseOp.blocks.length +
+            " clientHolder " + addCloseOp.clientName +
+            " clientMachine " + addCloseOp.clientMachine);
+      }
+
       INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
       if (oldFile == null) {
         throw new IOException("Operation trying to close non-existent file " +
@@ -478,14 +480,23 @@ public class FSEditLogLoader {
       }
       
       oldBlock.setNumBytes(newBlock.getNumBytes());
+      boolean changeMade =
+        oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
       oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
       
       if (oldBlock instanceof BlockInfoUnderConstruction &&
           (!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) {
+        changeMade = true;
         fsNamesys.getBlockManager().forceCompleteBlock(
             (INodeFileUnderConstruction)file,
             (BlockInfoUnderConstruction)oldBlock);
       }
+      if (changeMade) {
+        // The state or gen-stamp of the block has changed. So, we may be
+        // able to process some messages from datanodes that we previously
+        // were unable to process.
+        fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
+      }
     }
     
     if (addCloseOp.blocks.length < oldBlocks.length) {
@@ -517,13 +528,9 @@ public class FSEditLogLoader {
         }
         fsNamesys.getBlockManager().addINode(newBI, file);
         file.addBlock(newBI);
+        fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
       }
     }
-    
-    // Record the max genstamp seen
-    for (Block b : addCloseOp.blocks) {
-      maxGenStamp = Math.max(maxGenStamp, b.getGenerationStamp());
-    }
   }
 
   private static void dumpOpCounts(

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb  1 05:16:49 2012
@@ -154,10 +154,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@@ -321,8 +317,6 @@ public class FSNamesystem implements Nam
   // lock to protect FSNamesystem.
   private ReentrantReadWriteLock fsLock;
 
-  private PendingDataNodeMessages pendingDatanodeMessages = new PendingDataNodeMessages();
-  
   /**
    * Used when this NN is in standby state to read from the shared edit log.
    */
@@ -342,11 +336,7 @@ public class FSNamesystem implements Nam
   private boolean haEnabled;
 
   private final Configuration conf;
-  
-  PendingDataNodeMessages getPendingDataNodeMessages() {
-    return pendingDatanodeMessages;
-  }
-  
+    
   /**
    * Instantiates an FSNamesystem loaded from the image and edits
    * directories specified in the passed Configuration.
@@ -481,6 +471,8 @@ public class FSNamesystem implements Nam
     try {
       nnResourceChecker = new NameNodeResourceChecker(conf);
       checkAvailableResources();
+      assert safeMode != null &&
+        !safeMode.initializedReplQueues;
       setBlockTotal();
       blockManager.activate(conf);
       this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
@@ -531,6 +523,7 @@ public class FSNamesystem implements Nam
         LOG.info("Reprocessing replication and invalidation queues...");
         blockManager.getDatanodeManager().markAllDatanodesStale();
         blockManager.clearQueues();
+        blockManager.processAllPendingDNMessages();
         blockManager.processMisReplicatedBlocks();
         
         if (LOG.isDebugEnabled()) {
@@ -849,8 +842,9 @@ public class FSNamesystem implements Nam
   public boolean isRunning() {
     return fsRunning;
   }
-
-  private boolean isInStandbyState() {
+  
+  @Override
+  public boolean isInStandbyState() {
     if (haContext == null || haContext.getState() == null) {
       // We're still starting up. In this case, if HA is
       // on for the cluster, we always start in standby. Otherwise
@@ -1543,7 +1537,8 @@ public class FSNamesystem implements Nam
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
 
       if (append && myFile != null) {
-        return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode);
+        return prepareFileForWrite(
+            src, myFile, holder, clientMachine, clientNode, true);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1581,12 +1576,14 @@ public class FSNamesystem implements Nam
    * @param leaseHolder identifier of the lease holder on this file
    * @param clientMachine identifier of the client machine
    * @param clientNode if the client is collocated with a DN, that DN's descriptor
+   * @param writeToEditLog whether to persist this change to the edit log
    * @return the last block locations if the block is partial or null otherwise
    * @throws UnresolvedLinkException
    * @throws IOException
    */
   public LocatedBlock prepareFileForWrite(String src, INode file,
-      String leaseHolder, String clientMachine, DatanodeDescriptor clientNode)
+      String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
+      boolean writeToEditLog)
       throws UnresolvedLinkException, IOException {
     INodeFile node = (INodeFile) file;
     INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
@@ -1601,6 +1598,10 @@ public class FSNamesystem implements Nam
                                     clientNode);
     dir.replaceNode(src, node, cons);
     leaseManager.addLease(cons.getClientName(), src);
+    
+    if (writeToEditLog) {
+      getEditLog().logOpenFile(src, cons);
+    }
 
     return blockManager.convertLastBlockToUnderConstruction(cons);
   }
@@ -2346,9 +2347,45 @@ public class FSNamesystem implements Nam
     if (blocks == null) {
       return;
     }
-    for(Block b : blocks) {
+    
+    // In the case that we are a Standby tailing edits from the
+    // active while in safe-mode, we need to track the total number
+    // of blocks and safe blocks in the system.
+    boolean trackBlockCounts = isSafeModeTrackingBlocks();
+    int numRemovedComplete = 0, numRemovedSafe = 0;
+
+    for (Block b : blocks) {
+      if (trackBlockCounts) {
+        BlockInfo bi = blockManager.getStoredBlock(b);
+        if (bi.isComplete()) {
+          numRemovedComplete++;
+          if (bi.numNodes() >= blockManager.minReplication) {
+            numRemovedSafe++;
+          }
+        }
+      }
       blockManager.removeBlock(b);
     }
+    if (trackBlockCounts) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
+            "decreasing safeBlocks by " + numRemovedSafe +
+            ", totalBlocks by " + numRemovedComplete);
+      }
+      adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
+    }
+  }
+
+  /**
+   * @see SafeModeInfo#shouldIncrementallyTrackBlocks
+   */
+  private boolean isSafeModeTrackingBlocks() {
+    if (!haEnabled) {
+      // Never track blocks incrementally in non-HA code.
+      return false;
+    }
+    SafeModeInfo sm = this.safeMode;
+    return sm != null && sm.shouldIncrementallyTrackBlocks();
   }
 
   /**
@@ -2712,15 +2749,8 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
       if (haContext.getState().equals(NameNode.STANDBY_STATE)) {
         // TODO(HA) we'll never get here, since we check for WRITE operation above!
-        if (isGenStampInFuture(newgenerationstamp)) {
-          LOG.info("Required GS=" + newgenerationstamp
-              + ", Queuing commitBlockSynchronization message");
-          getPendingDataNodeMessages().queueMessage(
-              new PendingDataNodeMessages.CommitBlockSynchronizationMessage(
-                  lastblock, newgenerationstamp, newlength, closeFile, deleteblock,
-                  newtargets, newgenerationstamp));
-          return;
-        }
+        // Need to implement tests, etc, for this - block recovery spanning
+        // failover.
       }
 
       if (isInSafeMode()) {
@@ -3264,6 +3294,8 @@ public class FSNamesystem implements Nam
     boolean initializedReplQueues = false;
     /** Was safemode entered automatically because available resources were low. */
     private boolean resourcesLow = false;
+    /** Should safemode adjust its block totals as blocks come in */
+    private boolean shouldIncrementallyTrackBlocks = false;
     
     /**
      * Creates SafeModeInfo when the name node enters
@@ -3292,6 +3324,18 @@ public class FSNamesystem implements Nam
     }
 
     /**
+     * In the HA case, the StandbyNode can be in safemode while the namespace
+     * is modified by the edit log tailer. In this case, the number of total
+     * blocks changes as edits are processed (eg blocks are added and deleted).
+     * However, we don't want to do the incremental tracking during the
+     * startup-time loading process -- only once the initial total has been
+     * set after the image has been loaded.
+     */
+    private boolean shouldIncrementallyTrackBlocks() {
+      return shouldIncrementallyTrackBlocks;
+    }
+
+    /**
      * Creates SafeModeInfo when safe mode is entered manually, or because
      * available resources are low.
      *
@@ -3476,6 +3520,13 @@ public class FSNamesystem implements Nam
       this.blockThreshold = (int) (blockTotal * threshold);
       this.blockReplQueueThreshold = 
         (int) (blockTotal * replQueueThreshold);
+      if (haEnabled) {
+        // After we initialize the block count, any further namespace
+        // modifications done while in safe mode need to keep track
+        // of the number of total blocks in the system.
+        this.shouldIncrementallyTrackBlocks = true;
+      }
+      
       checkMode();
     }
       
@@ -3485,9 +3536,10 @@ public class FSNamesystem implements Nam
      * @param replication current replication 
      */
     private synchronized void incrementSafeBlockCount(short replication) {
-      if (replication == safeReplication)
+      if (replication == safeReplication) {
         this.blockSafe++;
-      checkMode();
+        checkMode();
+      }
     }
       
     /**
@@ -3496,9 +3548,11 @@ public class FSNamesystem implements Nam
      * @param replication current replication 
      */
     private synchronized void decrementSafeBlockCount(short replication) {
-      if (replication == safeReplication-1)
+      if (replication == safeReplication-1) {
         this.blockSafe--;
-      checkMode();
+        assert blockSafe >= 0 || isManual();
+        checkMode();
+      }
     }
 
     /**
@@ -3636,6 +3690,26 @@ public class FSNamesystem implements Nam
         + "BlockManager data: active="  + activeBlocks);
       }
     }
+
+    private void adjustBlockTotals(int deltaSafe, int deltaTotal) {
+      if (!shouldIncrementallyTrackBlocks) {
+        return;
+      }
+      assert haEnabled;
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adjusting block totals from " +
+            blockSafe + "/" + blockTotal + " to " +
+            (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
+      }
+      assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
+        blockSafe + " by " + deltaSafe + ": would be negative";
+      assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
+        blockTotal + " by " + deltaTotal + ": would be negative";
+      
+      blockSafe += deltaSafe;
+      setBlockTotal(blockTotal + deltaTotal);
+    }
   }
     
   /**
@@ -3741,7 +3815,24 @@ public class FSNamesystem implements Nam
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null) // mostly true
       return;
-    safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
+    BlockInfo storedBlock = blockManager.getStoredBlock(b);
+    if (storedBlock.isComplete()) {
+      safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
+    }
+  }
+  
+  /**
+   * Adjust the total number of blocks safe and expected during safe mode.
+   * If safe mode is not currently on, this is a no-op.
+   * @param deltaSafe the change in number of safe blocks
+   * @param deltaTotal the change i nnumber of total blocks expected
+   */
+  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode == null)
+      return;
+    safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
   }
 
   /**
@@ -4066,6 +4157,11 @@ public class FSNamesystem implements Nam
   }
   
   @Metric
+  public int getPendingDataNodeMessageCount() {
+    return blockManager.getPendingDataNodeMessageCount();
+  }
+  
+  @Metric
   public int getBlockCapacity() {
     return blockManager.getCapacity();
   }
@@ -4912,54 +5008,6 @@ public class FSNamesystem implements Nam
   public boolean isGenStampInFuture(long genStamp) {
     return (genStamp > getGenerationStamp());
   }
-  
-  public void notifyGenStampUpdate(long gs) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generation stamp " + gs + " has been reached. " +
-          "Processing pending messages from DataNodes...");
-    }
-    DataNodeMessage msg = pendingDatanodeMessages.take(gs);
-    while (msg != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing previously pending message: " + msg);
-      }
-      try {
-        switch (msg.getType()) {
-        case BLOCK_RECEIVED_DELETE:
-          BlockReceivedDeleteMessage m = (BlockReceivedDeleteMessage) msg;
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog
-                .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
-                    + m.getNodeReg().getName() + " "
-                    + m.getReceivedAndDeletedBlocks().length + " blocks.");
-          }
-          this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(),
-              m.getPoolId(), m.getReceivedAndDeletedBlocks());
-          break;
-        case BLOCK_REPORT:
-          BlockReportMessage mbr = (BlockReportMessage) msg;
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
-                + "from " + mbr.getNodeReg().getName() + " "
-                + mbr.getBlockList().getNumberOfBlocks() + " blocks");
-          }
-          this.getBlockManager().processReport(mbr.getNodeReg(),
-              mbr.getPoolId(), mbr.getBlockList());
-          break;
-        case COMMIT_BLOCK_SYNCHRONIZATION:
-          CommitBlockSynchronizationMessage mcbm = (CommitBlockSynchronizationMessage) msg;
-          this.commitBlockSynchronization(mcbm.getBlock(),
-              mcbm.getNewgenerationstamp(), mcbm.getNewlength(),
-              mcbm.isCloseFile(), mcbm.isDeleteblock(), mcbm.getNewtargets());
-          break;
-        }
-      } catch (IOException ex) {
-        LOG.warn("Could not process the message " + msg.getType(), ex);
-      }
-      msg = pendingDatanodeMessages.take(gs);
-    }
-  }
-  
   @VisibleForTesting
   public EditLogTailer getEditLogTailer() {
     return editLogTailer;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Feb  1 05:16:49 2012
@@ -878,16 +878,6 @@ class NameNodeRpcServer implements Namen
       String poolId, long[] blocks) throws IOException {
     verifyRequest(nodeReg);
     BlockListAsLongs blist = new BlockListAsLongs(blocks);
-    if (nn.isStandbyState()) {
-      long maxGs = blist.getMaxGsInBlockList();
-      if (namesystem.isGenStampInFuture(maxGs)) {
-        LOG.info("Required GS="+maxGs+", Queuing blockReport message");
-        namesystem.getPendingDataNodeMessages().queueMessage(
-            new PendingDataNodeMessages.BlockReportMessage(nodeReg, poolId,
-                blist, maxGs));
-        return null;
-      }
-    }
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
            + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
@@ -904,25 +894,6 @@ class NameNodeRpcServer implements Namen
   public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
       ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
     verifyRequest(nodeReg);
-    if (nn.isStandbyState()) {
-      if (receivedAndDeletedBlocks.length > 0) {
-        long maxGs = receivedAndDeletedBlocks[0].getBlock()
-            .getGenerationStamp();
-        for (ReceivedDeletedBlockInfo binfo : receivedAndDeletedBlocks) {
-          if (binfo.getBlock().getGenerationStamp() > maxGs) {
-            maxGs = binfo.getBlock().getGenerationStamp();
-          }
-        }
-        if (namesystem.isGenStampInFuture(maxGs)) {
-          LOG.info("Required GS=" + maxGs
-              + ", Queuing blockReceivedAndDeleted message");
-          namesystem.getPendingDataNodeMessages().queueMessage(
-              new PendingDataNodeMessages.BlockReceivedDeleteMessage(nodeReg,
-                  poolId, receivedAndDeletedBlocks, maxGs));
-          return;
-        }
-      }
-    }
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
           +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java Wed Feb  1 05:16:49 2012
@@ -32,4 +32,10 @@ public interface Namesystem extends RwLo
 
   /** @return the block pool ID */
   public String getBlockPoolId();
+
+  public boolean isInStandbyState();
+
+  public boolean isGenStampInFuture(long generationStamp);
+
+  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
 }
\ No newline at end of file

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1238940&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Wed Feb  1 05:16:49 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.*;
+
+import java.util.Queue;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+
+public class TestPendingDataNodeMessages {
+  PendingDataNodeMessages msgs = new PendingDataNodeMessages();
+  
+  private final Block block1Gs1 = new Block(1, 0, 1);
+  private final Block block1Gs2 = new Block(1, 0, 2);
+  private final Block block1Gs2DifferentInstance =
+    new Block(1, 0, 2);
+  private final Block block2Gs1 = new Block(2, 0, 1);
+  
+  private final DatanodeDescriptor fakeDN = new DatanodeDescriptor(
+      new DatanodeID("fake"));
+  
+  @Test
+  public void testQueues() {
+    msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED);
+
+    assertEquals(2, msgs.count());
+    
+    // Nothing queued yet for block 2
+    assertNull(msgs.takeBlockQueue(block2Gs1));
+    assertEquals(2, msgs.count());
+    
+    Queue<ReportedBlockInfo> q =
+      msgs.takeBlockQueue(block1Gs2DifferentInstance);
+    assertEquals(
+        "ReportedBlockInfo [block=blk_1_1, dn=fake, reportedState=FINALIZED]," +
+        "ReportedBlockInfo [block=blk_1_2, dn=fake, reportedState=FINALIZED]",
+        Joiner.on(",").join(q));
+    assertEquals(0, msgs.count());
+    
+    // Should be null if we pull again
+    assertNull(msgs.takeBlockQueue(block1Gs1));
+    assertEquals(0, msgs.count());
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Feb  1 05:16:49 2012
@@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.ipc.Server;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java Wed Feb  1 05:16:49 2012
@@ -21,18 +21,18 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -40,23 +40,29 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -360,6 +366,164 @@ public class TestDNFencing {
     FileSystem fs2 = cluster.getFileSystem(1);
     DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
   }
+  
+  /**
+   * Regression test for HDFS-2742. The issue in this bug was:
+   * - DN does a block report while file is open. This BR contains
+   *   the block in RBW state.
+   * - Standby queues the RBW state in PendingDatanodeMessages
+   * - Standby processes edit logs during failover. Before fixing
+   *   this bug, it was mistakenly applying the RBW reported state
+   *   after the block had been completed, causing the block to get
+   *   marked corrupt. Instead, we should now be applying the RBW
+   *   message on OP_ADD, and then the FINALIZED message on OP_CLOSE.
+   */
+  @Test
+  public void testBlockReportsWhileFileBeingWritten() throws Exception {
+    FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+    try {
+      AppendTestUtil.write(out, 0, 10);
+      out.hflush();
+      
+      // Block report will include the RBW replica, but will be
+      // queued on the StandbyNode.
+      cluster.triggerBlockReports();
+      
+    } finally {
+      IOUtils.closeStream(out);
+    }
+
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+    
+    // Verify that no replicas are marked corrupt, and that the
+    // file is readable from the failed-over standby.
+    BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
+    BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
+    assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
+    assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
+    
+    DFSTestUtil.readFile(fs, TEST_FILE_PATH);
+  }
+  
+  /**
+   * Test that, when a block is re-opened for append, the related
+   * datanode messages are correctly queued by the SBN because
+   * they have future states and genstamps.
+   */
+  @Test
+  public void testQueueingWithAppend() throws Exception {
+    int numQueued = 0;
+    int numDN = cluster.getDataNodes().size();
+    
+    FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+    try {
+      AppendTestUtil.write(out, 0, 10);
+      out.hflush();
+
+      // Opening the file will report RBW replicas, but will be
+      // queued on the StandbyNode.
+      numQueued += numDN; // RBW messages
+    } finally {
+      IOUtils.closeStream(out);
+      numQueued += numDN; // blockReceived messages
+    }
+    
+    cluster.triggerBlockReports();
+    numQueued += numDN;
+    
+    try {
+      out = fs.append(TEST_FILE_PATH);
+      AppendTestUtil.write(out, 10, 10);
+      // RBW replicas once it's opened for append
+      numQueued += numDN;
+
+    } finally {
+      IOUtils.closeStream(out);
+      numQueued += numDN; // blockReceived
+    }
+    
+    cluster.triggerBlockReports();
+    numQueued += numDN;
+
+    assertEquals(numQueued, cluster.getNameNode(1).getNamesystem().
+        getPendingDataNodeMessageCount());
+
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+    
+    // Verify that no replicas are marked corrupt, and that the
+    // file is readable from the failed-over standby.
+    BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
+    BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
+    assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
+    assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
+    
+    AppendTestUtil.check(fs, TEST_FILE_PATH, 20);
+  }
+  
+  /**
+   * Another regression test for HDFS-2742. This tests the following sequence:
+   * - DN does a block report while file is open. This BR contains
+   *   the block in RBW state.
+   * - The block report is delayed in reaching the standby.
+   * - The file is closed.
+   * - The standby processes the OP_ADD and OP_CLOSE operations before
+   *   the RBW block report arrives.
+   * - The standby should not mark the block as corrupt.
+   */
+  @Test
+  public void testRBWReportArrivesAfterEdits() throws Exception {
+    final CountDownLatch brFinished = new CountDownLatch(1);
+    DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
+      @Override
+      protected Object passThrough(InvocationOnMock invocation)
+          throws Throwable {
+        try {
+          return super.passThrough(invocation);
+        } finally {
+          // inform the test that our block report went through.
+          brFinished.countDown();
+        }
+      }
+    };
+
+    FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+    try {
+      AppendTestUtil.write(out, 0, 10);
+      out.hflush();
+
+      DataNode dn = cluster.getDataNodes().get(0);
+      DatanodeProtocolClientSideTranslatorPB spy =
+        DataNodeAdapter.spyOnBposToNN(dn, nn2);
+      
+      Mockito.doAnswer(delayer)
+        .when(spy).blockReport(
+          Mockito.<DatanodeRegistration>anyObject(),
+          Mockito.anyString(),
+          Mockito.<long[]>anyObject());
+      dn.scheduleAllBlockReport(0);
+      delayer.waitForCall();
+      
+    } finally {
+      IOUtils.closeStream(out);
+    }
+
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+    
+    delayer.proceed();
+    brFinished.await();
+    
+    // Verify that no replicas are marked corrupt, and that the
+    // file is readable from the failed-over standby.
+    BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
+    BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
+    assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
+    assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
+    
+    DFSTestUtil.readFile(fs, TEST_FILE_PATH);
+  }
 
   /**
    * Print a big banner in the test log to make debug easier.

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java Wed Feb  1 05:16:49 2012
@@ -25,10 +25,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -38,15 +41,19 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 
 /**
  * Tests that exercise safemode in an HA cluster.
@@ -60,6 +67,12 @@ public class TestHASafeMode {
   private MiniDFSCluster cluster;
   private Runtime mockRuntime = mock(Runtime.class);
   
+  static {
+    ((Log4JLogger)LogFactory.getLog(FSImage.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+  }
+  
   @Before
   public void setupCluster() throws Exception {
     Configuration conf = new Configuration();
@@ -112,7 +125,11 @@ public class TestHASafeMode {
   @Test
   public void testEnterSafeModeInANNShouldNotThrowNPE() throws Exception {
     banner("Restarting active");
+    DFSTestUtil
+      .createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L);
     restartActive();
+    nn0.getRpcServer().transitionToActive();
+
     FSNamesystem namesystem = nn0.getNamesystem();
     String status = namesystem.getSafemode();
     assertTrue("Bad safemode status: '" + status + "'", status
@@ -187,24 +204,14 @@ public class TestHASafeMode {
     banner("Restarting standby");
     restartStandby();
 
-    // We expect it to be stuck in safemode (not the extension) because
-    // the block reports are delayed (since they include blocks
-    // from /test2 which are too-high genstamps.
-    String status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 0 needs additional 3 blocks to reach"));
+    // We expect it not to be stuck in safemode, since those blocks
+    // that are already visible to the SBN should be processed
+    // in the initial block reports.
+    assertSafeMode(nn1, 3, 3);
 
     banner("Waiting for standby to catch up to active namespace");
     HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
-
-    status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 8 has reached the threshold 0.9990 of " +
-            "total blocks 8. Safe mode will be turned off automatically"));
+    assertSafeMode(nn1, 8, 8);
   }
   
   /**
@@ -224,12 +231,7 @@ public class TestHASafeMode {
     banner("Restarting standby");
     restartStandby();
     
-    String status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 3 has reached the threshold 0.9990 of " +
-            "total blocks 3. Safe mode will be turned off automatically"));
+    assertSafeMode(nn1, 3, 3);
     
     // Create a few blocks which will send blockReceived calls to the
     // SBN.
@@ -240,12 +242,7 @@ public class TestHASafeMode {
     banner("Waiting for standby to catch up to active namespace");
     HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
 
-    status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 8 has reached the threshold 0.9990 of " +
-            "total blocks 8. Safe mode will be turned off automatically"));
+    assertSafeMode(nn1, 8, 8);
   }
 
   /**
@@ -285,20 +282,11 @@ public class TestHASafeMode {
 
     banner("Restarting standby");
     restartStandby();
-    String status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 0 needs additional 5 blocks to reach"));
+    assertSafeMode(nn1, 0, 5);
     
     banner("Waiting for standby to catch up to active namespace");
     HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
-    status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 0 has reached the threshold 0.9990 of " +
-            "total blocks 0. Safe mode will be turned off automatically"));
+    assertSafeMode(nn1, 0, 0);
   }
   
   /**
@@ -320,12 +308,7 @@ public class TestHASafeMode {
     restartStandby();
     
     // It will initially have all of the blocks necessary.
-    String status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 10 has reached the threshold 0.9990 of " +
-            "total blocks 10. Safe mode will be turned off automatically"));
+    assertSafeMode(nn1, 10, 10);
 
     // Delete those blocks while the SBN is in safe mode - this
     // should reduce it back below the threshold
@@ -339,23 +322,123 @@ public class TestHASafeMode {
     HATestUtil.waitForDNDeletions(cluster);
     cluster.triggerDeletionReports();
 
-    status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 0 needs additional 10 blocks"));
+    assertSafeMode(nn1, 0, 10);
 
     banner("Waiting for standby to catch up to active namespace");
     HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
 
-    status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 0 has reached the threshold 0.9990 of " +
-            "total blocks 0. Safe mode will be turned off automatically"));
+    assertSafeMode(nn1, 0, 0);
+  }
+  
+  /**
+   * Tests that the standby node properly tracks the number of total
+   * and safe blocks while it is in safe mode. Since safe-mode only
+   * counts completed blocks, append needs to decrement the total
+   * number of blocks and then re-increment when the file is closed
+   * again.
+   */
+  @Test
+  public void testAppendWhileInSafeMode() throws Exception {
+    banner("Starting with NN0 active and NN1 standby, creating some blocks");
+    // Make 4.5 blocks so that append() will re-open an existing block
+    // instead of just adding a new one
+    DFSTestUtil.createFile(fs, new Path("/test"),
+        4*BLOCK_SIZE + BLOCK_SIZE/2, (short) 3, 1L);
+
+    // Roll edit log so that, when the SBN restarts, it will load
+    // the namespace during startup.
+    nn0.getRpcServer().rollEditLog();
+ 
+    banner("Restarting standby");
+    restartStandby();
+    
+    // It will initially have all of the blocks necessary.
+    assertSafeMode(nn1, 5, 5);
+
+    // Append to a block while SBN is in safe mode. This should
+    // not affect safemode initially, since the DN message
+    // will get queued.
+    FSDataOutputStream stm = fs.append(new Path("/test"));
+    try {
+      assertSafeMode(nn1, 5, 5);
+      
+      // if we roll edits now, the SBN should see that it's under construction
+      // and change its total count and safe count down by one, since UC
+      // blocks are not counted by safe mode.
+      HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+      assertSafeMode(nn1, 4, 4);
+    } finally {
+      IOUtils.closeStream(stm);
+    }
+    
+    // Delete those blocks while the SBN is in safe mode - this
+    // should reduce it back below the threshold
+    banner("Removing the blocks without rolling the edit log");
+    fs.delete(new Path("/test"), true);
+    BlockManagerTestUtil.computeAllPendingWork(
+        nn0.getNamesystem().getBlockManager());
+    
+    banner("Triggering deletions on DNs and Deletion Reports");
+    cluster.triggerHeartbeats();
+    HATestUtil.waitForDNDeletions(cluster);
+    cluster.triggerDeletionReports();
+
+    assertSafeMode(nn1, 0, 4);
+
+    banner("Waiting for standby to catch up to active namespace");
+    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+
+    assertSafeMode(nn1, 0, 0);
+  }
+  
+  /**
+   * Regression test for a bug experienced while developing
+   * HDFS-2742. The scenario here is:
+   * - image contains some blocks
+   * - edits log contains at least one block addition, followed
+   *   by deletion of more blocks than were added.
+   * - When node starts up, some incorrect accounting of block
+   *   totals caused an assertion failure.
+   */
+  @Test
+  public void testBlocksDeletedInEditLog() throws Exception {
+    banner("Starting with NN0 active and NN1 standby, creating some blocks");
+    // Make 4 blocks persisted in the image.
+    DFSTestUtil.createFile(fs, new Path("/test"),
+        4*BLOCK_SIZE, (short) 3, 1L);
+    NameNodeAdapter.enterSafeMode(nn0, false);
+    NameNodeAdapter.saveNamespace(nn0);
+    NameNodeAdapter.leaveSafeMode(nn0, false);
+    
+    // OP_ADD for 2 blocks
+    DFSTestUtil.createFile(fs, new Path("/test2"),
+        2*BLOCK_SIZE, (short) 3, 1L);
+    
+    // OP_DELETE for 4 blocks
+    fs.delete(new Path("/test"), true);
+
+    restartActive();
   }
   
+  private void assertSafeMode(NameNode nn, int safe, int total) {
+    String status = nn1.getNamesystem().getSafemode();
+    if (safe == total) {
+      assertTrue("Bad safemode status: '" + status + "'",
+          status.startsWith(
+            "Safe mode is ON." +
+            "The reported blocks " + safe + " has reached the threshold " +
+            "0.9990 of total blocks " + total + ". Safe mode will be " +
+            "turned off automatically"));
+    } else {
+      int additional = total - safe;
+      assertTrue("Bad safemode status: '" + status + "'",
+          status.startsWith(
+              "Safe mode is ON." +
+              "The reported blocks " + safe + " needs additional " +
+              additional + " blocks"));
+    }
+  }
+
   /**
    * Set up a namesystem with several edits, both deletions and
    * additions, and failover to a new NN while that NN is in
@@ -378,26 +461,107 @@ public class TestHASafeMode {
     banner("Restarting standby");
     restartStandby();
 
-    // We expect it to be stuck in safemode (not the extension) because
-    // the block reports are delayed (since they include blocks
-    // from /test2 which are too-high genstamps.
-    String status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 0 needs additional 3 blocks to reach"));
-
+    // We expect it to be on its way out of safemode, since all of the blocks
+    // from the edit log have been reported.
+    assertSafeMode(nn1, 3, 3);
+    
     // Initiate a failover into it while it's in safemode
     banner("Initiating a failover into NN1 in safemode");
     NameNodeAdapter.abortEditLogs(nn0);
     cluster.transitionToActive(1);
 
-    status = nn1.getNamesystem().getSafemode();
+    assertSafeMode(nn1, 5, 5);
+  }
+  
+  /**
+   * Similar to {@link #testBlocksRemovedWhileInSafeMode()} except that
+   * the OP_DELETE edits arrive at the SBN before the block deletion reports.
+   * The tracking of safe blocks needs to properly account for the removal
+   * of the blocks as well as the safe count. This is a regression test for
+   * HDFS-2742.
+   */
+  @Test
+  public void testBlocksRemovedWhileInSafeModeEditsArriveFirst() throws Exception {
+    banner("Starting with NN0 active and NN1 standby, creating some blocks");
+    DFSTestUtil.createFile(fs, new Path("/test"), 10*BLOCK_SIZE, (short) 3, 1L);
+
+    // Roll edit log so that, when the SBN restarts, it will load
+    // the namespace during startup.
+    nn0.getRpcServer().rollEditLog();
+ 
+    banner("Restarting standby");
+    restartStandby();
+    
+    // It will initially have all of the blocks necessary.
+    String status = nn1.getNamesystem().getSafemode();
     assertTrue("Bad safemode status: '" + status + "'",
         status.startsWith(
             "Safe mode is ON." +
-            "The reported blocks 5 has reached the threshold 0.9990 of " +
-            "total blocks 5. Safe mode will be turned off automatically"));
+            "The reported blocks 10 has reached the threshold 0.9990 of " +
+            "total blocks 10. Safe mode will be turned off automatically"));
+
+    // Delete those blocks while the SBN is in safe mode.
+    // Immediately roll the edit log before the actual deletions are sent
+    // to the DNs.
+    banner("Removing the blocks without rolling the edit log");
+    fs.delete(new Path("/test"), true);
+    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+
+    // Should see removal of the blocks as well as their contribution to safe block count.
+    assertSafeMode(nn1, 0, 0);
+
+    
+    banner("Triggering sending deletions to DNs and Deletion Reports");
+    BlockManagerTestUtil.computeAllPendingWork(
+        nn0.getNamesystem().getBlockManager());    
+    cluster.triggerHeartbeats();
+    HATestUtil.waitForDNDeletions(cluster);
+    cluster.triggerDeletionReports();
+
+    // No change in assertion status here, but some of the consistency checks
+    // in safemode will fire here if we accidentally decrement safe block count
+    // below 0.    
+    assertSafeMode(nn1, 0, 0);
+  }
+  
+
+  /**
+   * Test that the number of safe blocks is accounted correctly even when
+   * blocks move between under-construction state and completed state.
+   * If a FINALIZED report arrives at the SBN before the block is marked
+   * COMPLETE, then when we get the OP_CLOSE we need to count it as "safe"
+   * at that point. This is a regression test for HDFS-2742.
+   */
+  @Test
+  public void testSafeBlockTracking() throws Exception {
+    banner("Starting with NN0 active and NN1 standby, creating some " +
+    		"UC blocks plus some other blocks to force safemode");
+    DFSTestUtil.createFile(fs, new Path("/other-blocks"), 10*BLOCK_SIZE, (short) 3, 1L);
+
+    List<FSDataOutputStream> stms = Lists.newArrayList();
+    try {
+      for (int i = 0; i < 5; i++) {
+        FSDataOutputStream stm = fs.create(new Path("/test-uc-" + i));
+        stms.add(stm);
+        stm.write(1);
+        stm.hflush();
+      }
+      // Roll edit log so that, when the SBN restarts, it will load
+      // the namespace during startup and enter safemode.
+      nn0.getRpcServer().rollEditLog();
+    } finally {
+      for (FSDataOutputStream stm : stms) {
+        IOUtils.closeStream(stm);
+      }
+    }
+    
+    banner("Restarting SBN");
+    restartStandby();
+    assertSafeMode(nn1, 10, 10);
+
+    banner("Allowing SBN to catch up");
+    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+    assertSafeMode(nn1, 15, 15);
   }
   
   /**
@@ -425,12 +589,7 @@ public class TestHASafeMode {
     nn0.getRpcServer().rollEditLog();
     
     restartStandby();
-    String status = nn1.getNamesystem().getSafemode();
-    assertTrue("Bad safemode status: '" + status + "'",
-        status.startsWith(
-            "Safe mode is ON." +
-            "The reported blocks 6 has reached the threshold 0.9990 of " +
-            "total blocks 6. Safe mode will be turned off automatically"));    
+    assertSafeMode(nn1, 6, 6);
   }
   
   /**