You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/03/01 20:08:34 UTC

hadoop git commit: HDFS-9198. Coalesce IBR processing in the NN. Contributed by Daryn Sharp.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 e14ab939b -> 9fa300ad1


HDFS-9198. Coalesce IBR processing in the NN. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9fa300ad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9fa300ad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9fa300ad

Branch: refs/heads/branch-2.7
Commit: 9fa300ad1f41b913a51c35d9c6c8d13a4208550c
Parents: e14ab93
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Mar 1 13:02:05 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Mar 1 13:02:05 2016 -0600

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../server/blockmanagement/BlockManager.java    | 155 ++++++++++++++++-
 .../blockmanagement/DatanodeDescriptor.java     |  19 +-
 .../server/blockmanagement/DatanodeManager.java |   2 +-
 .../blockmanagement/DatanodeStorageInfo.java    |   4 +-
 .../hdfs/server/namenode/CacheManager.java      |   2 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  40 ++++-
 .../namenode/metrics/NameNodeMetrics.java       |  12 ++
 .../hadoop/hdfs/TestDatanodeRegistration.java   | 141 ++++++++++++++-
 .../blockmanagement/TestBlockManager.java       | 173 ++++++++++++++++++-
 .../blockmanagement/TestPendingReplication.java |   4 +-
 .../server/datanode/BlockReportTestBase.java    |  10 +-
 .../datanode/TestIncrementalBrVariations.java   |  11 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |  18 +-
 14 files changed, 547 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2fd8905..4b3b1ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -77,6 +77,8 @@ Release 2.7.3 - UNRELEASED
     HDFS-8914. Document HA support in the HDFS HdfsDesign.md.
     (Lars Francke via wheat9)
 
+    HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)
+
     HDFS-9648. TestStartup.testImageChecksum is broken by HDFS-9569's message
     change. (Wei-Chiu Chuang via Yongjun Zhang)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ef784d2..94ac335 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -35,6 +34,11 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -82,6 +86,7 @@ import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
 
@@ -172,6 +177,10 @@ public class BlockManager {
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
   
+  /** Block report thread for handling async reports. */
+  private final BlockReportProcessingThread blockReportThread =
+      new BlockReportProcessingThread();
+
   /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
@@ -445,12 +454,15 @@ public class BlockManager {
     pendingReplications.start();
     datanodeManager.activate(conf);
     this.replicationThread.start();
+    this.blockReportThread.start();
   }
 
   public void close() {
     try {
       replicationThread.interrupt();
+      blockReportThread.interrupt();
       replicationThread.join(3000);
+      blockReportThread.join(3000);
     } catch (InterruptedException ie) {
     }
     datanodeManager.close();
@@ -1786,7 +1798,7 @@ public class BlockManager {
 
     try {
       node = datanodeManager.getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
+      if (node == null || !node.isRegistered()) {
         throw new IOException(
             "ProcessReport from dead or unregistered node: " + nodeID);
       }
@@ -3121,17 +3133,23 @@ public class BlockManager {
   public void processIncrementalBlockReport(final DatanodeID nodeID,
       final StorageReceivedDeletedBlocks srdb) throws IOException {
     assert namesystem.hasWriteLock();
-    int received = 0;
-    int deleted = 0;
-    int receiving = 0;
     final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
+    if (node == null || !node.isRegistered()) {
       blockLog.warn("BLOCK* processIncrementalBlockReport"
               + " is received from dead or unregistered node {}", nodeID);
       throw new IOException(
           "Got incremental block report from unregistered or dead node");
     }
+    try {
+      processIncrementalBlockReport(node, srdb);
+    } catch (Exception ex) {
+      node.setForceRegistration(true);
+      throw ex;
+    }
+  }
 
+  private void processIncrementalBlockReport(final DatanodeDescriptor node,
+      final StorageReceivedDeletedBlocks srdb) throws IOException {
     DatanodeStorageInfo storageInfo =
         node.getStorageInfo(srdb.getStorage().getStorageID());
     if (storageInfo == null) {
@@ -3143,6 +3161,10 @@ public class BlockManager {
       storageInfo = node.updateStorage(srdb.getStorage());
     }
 
+    int received = 0;
+    int deleted = 0;
+    int receiving = 0;
+
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
       switch (rdbi.getStatus()) {
       case DELETED_BLOCK:
@@ -3160,17 +3182,17 @@ public class BlockManager {
         break;
       default:
         String msg = 
-          "Unknown block status code reported by " + nodeID +
+          "Unknown block status code reported by " + node +
           ": " + rdbi;
         blockLog.warn(msg);
         assert false : msg; // if assertions are enabled, throw.
         break;
       }
       blockLog.debug("BLOCK* block {}: {} is received from {}",
-          rdbi.getStatus(), rdbi.getBlock(), nodeID);
+          rdbi.getStatus(), rdbi.getBlock(), node);
     }
     blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
-            + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
+            + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
         received, deleted);
   }
 
@@ -3719,4 +3741,119 @@ public class BlockManager {
     clearQueues();
     blocksMap.clear();
   }
+  // async processing of an action, used for IBRs.
+  public void enqueueBlockOp(final Runnable action) throws IOException {
+    try {
+      blockReportThread.enqueue(action);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+  }
+
+  // sync batch processing for a full BR.
+  public <T> T runBlockOp(final Callable<T> action)
+      throws IOException {
+    final FutureTask<T> future = new FutureTask<T>(action);
+    enqueueBlockOp(future);
+    try {
+      return future.get();
+    } catch (ExecutionException ee) {
+      Throwable cause = ee.getCause();
+      if (cause == null) {
+        cause = ee;
+      }
+      if (!(cause instanceof IOException)) {
+        cause = new IOException(cause);
+      }
+      throw (IOException)cause;
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      throw new IOException(ie);
+    }
+  }
+
+  @VisibleForTesting
+  public void flushBlockOps() throws IOException {
+    runBlockOp(new Callable<Void>(){
+      @Override
+      public Void call() {
+        return null;
+      }
+    });
+  }
+
+  public int getBlockOpQueueLength() {
+    return blockReportThread.queue.size();
+  }
+
+  private class BlockReportProcessingThread extends Thread {
+    private static final long MAX_LOCK_HOLD_MS = 4;
+    private long lastFull = 0;
+
+    private final BlockingQueue<Runnable> queue =
+        new ArrayBlockingQueue<Runnable>(1024);
+
+    BlockReportProcessingThread() {
+      super("Block report processor");
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      try {
+        processQueue();
+      } catch (Throwable t) {
+        ExitUtil.terminate(1,
+            getName() + " encountered fatal exception: " + t);
+      }
+    }
+
+    private void processQueue() {
+      while (namesystem.isRunning()) {
+        NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+        try {
+          Runnable action = queue.take();
+          // batch as many operations in the write lock until the queue
+          // runs dry, or the max lock hold is reached.
+          int processed = 0;
+          namesystem.writeLock();
+          metrics.setBlockOpsQueued(queue.size() + 1);
+          try {
+            long start = Time.monotonicNow();
+            do {
+              processed++;
+              action.run();
+              if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
+                break;
+              }
+              action = queue.poll();
+            } while (action != null);
+          } finally {
+            namesystem.writeUnlock();
+            metrics.addBlockOpsBatched(processed - 1);
+          }
+        } catch (InterruptedException e) {
+          // ignore unless thread was specifically interrupted.
+          if (Thread.interrupted()) {
+            break;
+          }
+        }
+      }
+      queue.clear();
+    }
+
+    void enqueue(Runnable action) throws InterruptedException {
+      if (!queue.offer(action)) {
+        if (!isAlive() && namesystem.isRunning()) {
+          ExitUtil.terminate(1, getName()+" is not running");
+        }
+        long now = Time.monotonicNow();
+        if (now - lastFull > 4000) {
+          lastFull = now;
+          LOG.info("Block report queue is full");
+        }
+        queue.put(action);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index f263ebd..5890855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -52,8 +52,6 @@ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -210,7 +208,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public boolean isAlive = false;
   public boolean needKeyUpdate = false;
 
-  
+  private boolean forceRegistration = false;
+
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
   // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -279,7 +278,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
       return storageMap.get(storageID);
     }
   }
-  DatanodeStorageInfo[] getStorageInfos() {
+  @VisibleForTesting
+  public DatanodeStorageInfo[] getStorageInfos() {
     synchronized (storageMap) {
       final Collection<DatanodeStorageInfo> storages = storageMap.values();
       return storages.toArray(new DatanodeStorageInfo[storages.size()]);
@@ -826,6 +826,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       storage.setBlockReportCount(0);
     }
     heartbeatedSinceRegistration = false;
+    forceRegistration = false;
   }
 
   /**
@@ -909,6 +910,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
         return false;
     }
     return true;
- }
+  }
+
+  public void setForceRegistration(boolean force) {
+    forceRegistration = force;
+  }
+
+  public boolean isRegistered() {
+    return isAlive && !forceRegistration;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 9caca16..92e87b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1361,7 +1361,7 @@ public class DatanodeManager {
           throw new DisallowedDatanodeException(nodeinfo);
         }
 
-        if (nodeinfo == null || !nodeinfo.isAlive) {
+        if (nodeinfo == null || !nodeinfo.isRegistered()) {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 3adca32..b3e45f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -144,7 +144,7 @@ public class DatanodeStorageInfo {
     this.state = s.getState();
   }
 
-  int getBlockReportCount() {
+  public int getBlockReportCount() {
     return blockReportCount;
   }
 
@@ -202,7 +202,7 @@ public class DatanodeStorageInfo {
     return getState() == State.FAILED && numBlocks != 0;
   }
 
-  String getStorageID() {
+  public String getStorageID() {
     return storageID;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index e5270ad..e19c83d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -916,7 +916,7 @@ public final class CacheManager {
     try {
       final DatanodeDescriptor datanode = 
           blockManager.getDatanodeManager().getDatanode(datanodeID);
-      if (datanode == null || !datanode.isAlive) {
+      if (datanode == null || !datanode.isRegistered()) {
         throw new IOException(
             "processCacheReport from dead or unregistered datanode: " +
             datanode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index dfd51f8..582e492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -34,6 +34,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import com.google.common.collect.Lists;
 
@@ -1300,9 +1301,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // DatanodeProtocol
-  public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-        String poolId, StorageBlockReport[] reports,
-        BlockReportContext context) throws IOException {
+  public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
+        String poolId, final StorageBlockReport[] reports,
+        final BlockReportContext context) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     if(blockStateChangeLog.isDebugEnabled()) {
@@ -1318,8 +1319,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
       // for the same node and storage, so the value returned by the last
       // call of this loop is the final updated value for noStaleStorage.
       //
-      noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
-          blocks, context, (r == reports.length - 1));
+      final int index = r;
+      noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws IOException {
+          return bm.processReport(nodeReg, reports[index].getStorage(),
+              blocks, context, (index == reports.length - 1));
+        }
+      });
       metrics.incrStorageBlockReportOps();
     }
 
@@ -1347,8 +1354,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // DatanodeProtocol
-  public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
-      StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
+  public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
+      String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
+          throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     metrics.incrBlockReceivedAndDeletedOps();
@@ -1357,8 +1365,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
           +" blocks.");
     }
-    for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
-      namesystem.processIncrementalBlockReport(nodeReg, r);
+    final BlockManager bm = namesystem.getBlockManager();
+    for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
+      bm.enqueueBlockOp(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            namesystem.processIncrementalBlockReport(nodeReg, r);
+          } catch (Exception ex) {
+            // usually because the node is unregistered/dead.  next heartbeat
+            // will correct the problem
+            blockStateChangeLog.error(
+                "*BLOCK* NameNode.blockReceivedAndDeleted: "
+                    + "failed from " + nodeReg + ": " + ex.getMessage());
+          }
+        }
+      });
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 31bc164..54b5c6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -76,6 +76,10 @@ public class NameNodeMetrics {
   MutableCounterLong blockReceivedAndDeletedOps;
   @Metric("Number of blockReports from individual storages")
   MutableCounterLong storageBlockReportOps;
+  @Metric("Number of blockReports and blockReceivedAndDeleted queued")
+  MutableGaugeInt blockOpsQueued;
+  @Metric("Number of blockReports and blockReceivedAndDeleted batch processed")
+  MutableCounterLong blockOpsBatched;
 
   @Metric("Number of file system operations")
   public long totalFileOps(){
@@ -267,6 +271,14 @@ public class NameNodeMetrics {
     storageBlockReportOps.incr();
   }
 
+  public void setBlockOpsQueued(int size) {
+    blockOpsQueued.set(size);
+  }
+
+  public void addBlockOpsBatched(int count) {
+    blockOpsBatched.incr(count);
+  }
+
   public void addTransaction(long latency) {
     transactions.add(latency);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
index 0e5d974..9d50c7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
@@ -20,22 +20,32 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.junit.Test;
 
-import java.net.InetAddress;
+import com.google.common.base.Supplier;
+
 import java.net.InetSocketAddress;
 import java.security.Permission;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.doReturn;
@@ -309,4 +319,131 @@ public class TestDatanodeRegistration {
       }
     }
   }
-}
+
+  // IBRs are async operations to free up IPC handlers.  This means the IBR
+  // response will not contain non-IPC level exceptions - which in practice
+  // should not occur other than dead/unregistered node which will trigger a
+  // re-registration.  If a non-IPC exception does occur, the safety net is
+  // a forced re-registration on the next heartbeat.
+  @Test(timeout=10000)
+  public void testForcedRegistration() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE);
+
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    cluster.getHttpUri(0);
+    FSNamesystem fsn = cluster.getNamesystem();
+    String bpId = fsn.getBlockPoolId();
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    DatanodeDescriptor dnd =
+        NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+
+    // registration should not change after heartbeat.
+    assertTrue(dnd.isRegistered());
+    DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
+    waitForHeartbeat(dn, dnd);
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // force a re-registration on next heartbeat.
+    dnd.setForceRegistration(true);
+    assertFalse(dnd.isRegistered());
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
+    assertNotSame(lastReg, newReg);
+    lastReg = newReg;
+
+    // registration should not change on subsequent heartbeats.
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+    assertTrue(waitForBlockReport(dn, dnd));
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // check that block report is not processed and registration didn't change.
+    dnd.setForceRegistration(true);
+    assertFalse(waitForBlockReport(dn, dnd));
+    assertFalse(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // heartbeat should trigger re-registration, and next block report should
+    // not change registration.
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    newReg = dn.getDNRegistrationForBP(bpId);
+    assertNotSame(lastReg, newReg);
+    lastReg = newReg;
+    assertTrue(waitForBlockReport(dn, dnd));
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // registration doesn't change.
+    ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
+    dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
+    DataNodeTestUtils.triggerDeletionReport(dn);
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // a failed IBR will effectively unregister the node.
+    boolean failed = false;
+    try {
+      // pass null to cause a failure since there aren't any easy failure
+      // modes since it shouldn't happen.
+      fsn.processIncrementalBlockReport(lastReg, null);
+    } catch (NullPointerException npe) {
+      failed = true;
+    }
+    assertTrue("didn't fail", failed);
+    assertFalse(dnd.isRegistered());
+
+    // should remain unregistered until next heartbeat.
+    dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
+    DataNodeTestUtils.triggerDeletionReport(dn);
+    assertFalse(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
+  }
+
+  private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd)
+      throws Exception {
+    final long lastUpdate = dnd.getLastUpdateMonotonic();
+    Thread.sleep(1);
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    DataNodeTestUtils.triggerHeartbeat(dn);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return lastUpdate != dnd.getLastUpdateMonotonic();
+      }
+    }, 10, 100000);
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+  }
+
+  private boolean waitForBlockReport(final DataNode dn,
+      final DatanodeDescriptor dnd) throws Exception {
+    final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+    final long lastCount = storage.getBlockReportCount();
+    dn.triggerBlockReport(
+        new BlockReportOptions.Factory().setIncremental(false).build());
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return lastCount != storage.getBlockReportCount();
+        }
+      }, 10, 100);
+    } catch (TimeoutException te) {
+      return false;
+    }
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 196a38b..5f08886 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -34,10 +35,22 @@ import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -66,9 +79,13 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetworkTopology;
 import org.junit.Assert;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -900,4 +917,158 @@ public class TestBlockManager {
     assertFalse("Replicas for block is not stored on enough racks",
         bm.isPlacementPolicySatisfied(blockInfo));
   }
-}
+
+  @Test
+  public void testBlockReportQueueing() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitActive();
+      final FSNamesystem fsn = cluster.getNamesystem();
+      final BlockManager bm = fsn.getBlockManager();
+      final ExecutorService executor = Executors.newCachedThreadPool();
+
+      final CyclicBarrier startBarrier = new CyclicBarrier(2);
+      final CountDownLatch endLatch = new CountDownLatch(3);
+
+      // create a task intended to block while processing, thus causing
+      // the queue to backup.  simulates how a full BR is processed.
+      FutureTask<?> blockingOp = new FutureTask<Void>(
+          new Callable<Void>(){
+            @Override
+            public Void call() throws IOException {
+              return bm.runBlockOp(new Callable<Void>() {
+                @Override
+                public Void call()
+                    throws InterruptedException, BrokenBarrierException {
+                  // use a barrier to control the blocking.
+                  startBarrier.await();
+                  endLatch.countDown();
+                  return null;
+                }
+              });
+            }
+          });
+
+      // create an async task.  simulates how an IBR is processed.
+      Callable<?> asyncOp = new Callable<Void>(){
+        @Override
+        public Void call() throws IOException {
+          bm.enqueueBlockOp(new Runnable() {
+            @Override
+            public void run() {
+              // use the latch to signal if the op has run.
+              endLatch.countDown();
+            }
+          });
+          return null;
+        }
+      };
+
+      // calling get forces its execution so we can test if it's blocked.
+      Future<?> blockedFuture = executor.submit(blockingOp);
+      boolean isBlocked = false;
+      try {
+        // wait 1s for the future to block.  it should run instantaneously.
+        blockedFuture.get(1, TimeUnit.SECONDS);
+      } catch (TimeoutException te) {
+        isBlocked = true;
+      }
+      assertTrue(isBlocked);
+
+      // should effectively return immediately since calls are queued.
+      // however they should be backed up in the queue behind the blocking
+      // operation.
+      executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
+      executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
+
+      // check the async calls are queued, and first is still blocked.
+      assertEquals(2, bm.getBlockOpQueueLength());
+      assertFalse(blockedFuture.isDone());
+
+      // unblock the queue, wait for last op to complete, check the blocked
+      // call has returned
+      startBarrier.await(1, TimeUnit.SECONDS);
+      assertTrue(endLatch.await(1, TimeUnit.SECONDS));
+      assertEquals(0, bm.getBlockOpQueueLength());
+      assertTrue(blockingOp.isDone());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  // spam the block manager with IBRs to verify queuing is occurring.
+  @Test
+  public void testAsyncIBR() throws Exception {
+    Logger.getRootLogger().setLevel(Level.WARN);
+
+    // will create files with many small blocks.
+    final int blkSize = 4*1024;
+    final int fileSize = blkSize * 100;
+    final byte[] buf = new byte[2*blkSize];
+    final int numWriters = 4;
+    final int repl = 3;
+
+    final CyclicBarrier barrier = new CyclicBarrier(numWriters);
+    final CountDownLatch writeLatch = new CountDownLatch(numWriters);
+    final AtomicBoolean failure = new AtomicBoolean();
+
+    final Configuration conf = new HdfsConfiguration();
+    conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize);
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
+
+    try {
+      cluster.waitActive();
+      // create multiple writer threads to create a file with many blocks.
+      // will test that concurrent writing causes IBR batching in the NN
+      Thread[] writers = new Thread[numWriters];
+      for (int i=0; i < writers.length; i++) {
+        final Path p = new Path("/writer"+i);
+        writers[i] = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              FileSystem fs = cluster.getFileSystem();
+              FSDataOutputStream os =
+                  fs.create(p, true, buf.length, (short)repl, blkSize);
+              // align writers for maximum chance of IBR batching.
+              barrier.await();
+              int remaining = fileSize;
+              while (remaining > 0) {
+                os.write(buf);
+                remaining -= buf.length;
+              }
+              os.close();
+            } catch (Exception e) {
+              e.printStackTrace();
+              failure.set(true);
+            }
+            // let main thread know we are done.
+            writeLatch.countDown();
+          }
+        });
+        writers[i].start();
+      }
+
+      // when and how many IBRs are queued is indeterminate, so just watch
+      // the metrics and verify something was queued at during execution.
+      boolean sawQueued = false;
+      while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) {
+        assertFalse(failure.get());
+        MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
+        long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb);
+        sawQueued |= (queued > 0);
+      }
+      assertFalse(failure.get());
+      assertTrue(sawQueued);
+
+      // verify that batching of the IBRs occurred.
+      MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
+      long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
+      assertTrue(batched > 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
index 698a38e..06917a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
@@ -298,7 +298,8 @@ public class TestPendingReplication {
           reportDnNum++;
         }
       }
-
+      // IBRs are async, make sure the NN processes all of them.
+      cluster.getNamesystem().getBlockManager().flushBlockOps();
       assertEquals(DATANODE_COUNT - 3,
           blkManager.pendingReplications.getNumReplicas(blocks[0]));
 
@@ -316,6 +317,7 @@ public class TestPendingReplication {
         }
       }
 
+      cluster.getNamesystem().getBlockManager().flushBlockOps();
       assertEquals(DATANODE_COUNT - 3,
           blkManager.pendingReplications.getNumReplicas(blocks[0]));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index c4a2d06..0a57005 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -113,9 +113,13 @@ public abstract class BlockReportTestBase {
 
   @After
   public void shutDownCluster() throws IOException {
-    fs.close();
-    cluster.shutdownDataNodes();
-    cluster.shutdown();
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+    }
   }
 
   protected static void resetConfiguration() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
index 4e73e6e..79f87c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.UUID;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -182,8 +181,11 @@ public class TestIncrementalBrVariations {
     }
 
     // Make sure that the deleted block from each storage was picked up
-    // by the NameNode.
-    assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length));
+    // by the NameNode. IBRs are async, make sure the NN processes
+    // all of them.
+    cluster.getNamesystem().getBlockManager().flushBlockOps();
+    assertThat(cluster.getNamesystem().getMissingBlocksCount(),
+        is((long) reports.length));
   }
 
   /**
@@ -249,7 +251,8 @@ public class TestIncrementalBrVariations {
 
     // Send the report to the NN.
     cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
-
+    // IBRs are async, make sure the NN processes all of them.
+    cluster.getNamesystem().getBlockManager().flushBlockOps();
     // Make sure that the NN has learned of the new storage.
     DatanodeStorageInfo storageInfo = cluster.getNameNode()
                                              .getNamesystem()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 92c329e..6900230 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -94,14 +96,14 @@ public class TestDeadDatanode {
         null) };
     StorageReceivedDeletedBlocks[] storageBlocks = { 
         new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
-    
-    // Ensure blockReceived call from dead datanode is rejected with IOException
-    try {
-      dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
-      fail("Expected IOException is not thrown");
-    } catch (IOException ex) {
-      // Expected
-    }
+
+    // Ensure blockReceived call from dead datanode is not rejected with
+    // IOException, since it's async, but the node remains unregistered.
+    dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
+    BlockManager bm = cluster.getNamesystem().getBlockManager();
+    // IBRs are async, make sure the NN processes all of them.
+    bm.flushBlockOps();
+    assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
 
     // Ensure blockReport from dead datanode is rejected with IOException
     StorageBlockReport[] report = { new StorageBlockReport(