You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/03/01 20:08:34 UTC
hadoop git commit: HDFS-9198. Coalesce IBR processing in the NN.
Contributed by Daryn Sharp.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 e14ab939b -> 9fa300ad1
HDFS-9198. Coalesce IBR processing in the NN. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9fa300ad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9fa300ad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9fa300ad
Branch: refs/heads/branch-2.7
Commit: 9fa300ad1f41b913a51c35d9c6c8d13a4208550c
Parents: e14ab93
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Mar 1 13:02:05 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Mar 1 13:02:05 2016 -0600
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../server/blockmanagement/BlockManager.java | 155 ++++++++++++++++-
.../blockmanagement/DatanodeDescriptor.java | 19 +-
.../server/blockmanagement/DatanodeManager.java | 2 +-
.../blockmanagement/DatanodeStorageInfo.java | 4 +-
.../hdfs/server/namenode/CacheManager.java | 2 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 40 ++++-
.../namenode/metrics/NameNodeMetrics.java | 12 ++
.../hadoop/hdfs/TestDatanodeRegistration.java | 141 ++++++++++++++-
.../blockmanagement/TestBlockManager.java | 173 ++++++++++++++++++-
.../blockmanagement/TestPendingReplication.java | 4 +-
.../server/datanode/BlockReportTestBase.java | 10 +-
.../datanode/TestIncrementalBrVariations.java | 11 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 18 +-
14 files changed, 547 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2fd8905..4b3b1ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -77,6 +77,8 @@ Release 2.7.3 - UNRELEASED
HDFS-8914. Document HA support in the HDFS HdfsDesign.md.
(Lars Francke via wheat9)
+ HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)
+
HDFS-9648. TestStartup.testImageChecksum is broken by HDFS-9569's message
change. (Wei-Chiu Chuang via Yongjun Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ef784d2..94ac335 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -35,6 +34,11 @@ import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -82,6 +86,7 @@ import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
@@ -172,6 +177,10 @@ public class BlockManager {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+ /** Block report thread for handling async reports. */
+ private final BlockReportProcessingThread blockReportThread =
+ new BlockReportProcessingThread();
+
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@@ -445,12 +454,15 @@ public class BlockManager {
pendingReplications.start();
datanodeManager.activate(conf);
this.replicationThread.start();
+ this.blockReportThread.start();
}
public void close() {
try {
replicationThread.interrupt();
+ blockReportThread.interrupt();
replicationThread.join(3000);
+ blockReportThread.join(3000);
} catch (InterruptedException ie) {
}
datanodeManager.close();
@@ -1786,7 +1798,7 @@ public class BlockManager {
try {
node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isAlive) {
+ if (node == null || !node.isRegistered()) {
throw new IOException(
"ProcessReport from dead or unregistered node: " + nodeID);
}
@@ -3121,17 +3133,23 @@ public class BlockManager {
public void processIncrementalBlockReport(final DatanodeID nodeID,
final StorageReceivedDeletedBlocks srdb) throws IOException {
assert namesystem.hasWriteLock();
- int received = 0;
- int deleted = 0;
- int receiving = 0;
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isAlive) {
+ if (node == null || !node.isRegistered()) {
blockLog.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node {}", nodeID);
throw new IOException(
"Got incremental block report from unregistered or dead node");
}
+ try {
+ processIncrementalBlockReport(node, srdb);
+ } catch (Exception ex) {
+ node.setForceRegistration(true);
+ throw ex;
+ }
+ }
+ private void processIncrementalBlockReport(final DatanodeDescriptor node,
+ final StorageReceivedDeletedBlocks srdb) throws IOException {
DatanodeStorageInfo storageInfo =
node.getStorageInfo(srdb.getStorage().getStorageID());
if (storageInfo == null) {
@@ -3143,6 +3161,10 @@ public class BlockManager {
storageInfo = node.updateStorage(srdb.getStorage());
}
+ int received = 0;
+ int deleted = 0;
+ int receiving = 0;
+
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
@@ -3160,17 +3182,17 @@ public class BlockManager {
break;
default:
String msg =
- "Unknown block status code reported by " + nodeID +
+ "Unknown block status code reported by " + node +
": " + rdbi;
blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
blockLog.debug("BLOCK* block {}: {} is received from {}",
- rdbi.getStatus(), rdbi.getBlock(), nodeID);
+ rdbi.getStatus(), rdbi.getBlock(), node);
}
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
- + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
+ + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
received, deleted);
}
@@ -3719,4 +3741,119 @@ public class BlockManager {
clearQueues();
blocksMap.clear();
}
+ // async processing of an action, used for IBRs.
+ public void enqueueBlockOp(final Runnable action) throws IOException {
+ try {
+ blockReportThread.enqueue(action);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ // sync batch processing for a full BR.
+ public <T> T runBlockOp(final Callable<T> action)
+ throws IOException {
+ final FutureTask<T> future = new FutureTask<T>(action);
+ enqueueBlockOp(future);
+ try {
+ return future.get();
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause == null) {
+ cause = ee;
+ }
+ if (!(cause instanceof IOException)) {
+ cause = new IOException(cause);
+ }
+ throw (IOException)cause;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException(ie);
+ }
+ }
+
+ @VisibleForTesting
+ public void flushBlockOps() throws IOException {
+ runBlockOp(new Callable<Void>(){
+ @Override
+ public Void call() {
+ return null;
+ }
+ });
+ }
+
+ public int getBlockOpQueueLength() {
+ return blockReportThread.queue.size();
+ }
+
+ private class BlockReportProcessingThread extends Thread {
+ private static final long MAX_LOCK_HOLD_MS = 4;
+ private long lastFull = 0;
+
+ private final BlockingQueue<Runnable> queue =
+ new ArrayBlockingQueue<Runnable>(1024);
+
+ BlockReportProcessingThread() {
+ super("Block report processor");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ processQueue();
+ } catch (Throwable t) {
+ ExitUtil.terminate(1,
+ getName() + " encountered fatal exception: " + t);
+ }
+ }
+
+ private void processQueue() {
+ while (namesystem.isRunning()) {
+ NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+ try {
+ Runnable action = queue.take();
+ // batch as many operations in the write lock until the queue
+ // runs dry, or the max lock hold is reached.
+ int processed = 0;
+ namesystem.writeLock();
+ metrics.setBlockOpsQueued(queue.size() + 1);
+ try {
+ long start = Time.monotonicNow();
+ do {
+ processed++;
+ action.run();
+ if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
+ break;
+ }
+ action = queue.poll();
+ } while (action != null);
+ } finally {
+ namesystem.writeUnlock();
+ metrics.addBlockOpsBatched(processed - 1);
+ }
+ } catch (InterruptedException e) {
+ // ignore unless thread was specifically interrupted.
+ if (Thread.interrupted()) {
+ break;
+ }
+ }
+ }
+ queue.clear();
+ }
+
+ void enqueue(Runnable action) throws InterruptedException {
+ if (!queue.offer(action)) {
+ if (!isAlive() && namesystem.isRunning()) {
+ ExitUtil.terminate(1, getName()+" is not running");
+ }
+ long now = Time.monotonicNow();
+ if (now - lastFull > 4000) {
+ lastFull = now;
+ LOG.info("Block report queue is full");
+ }
+ queue.put(action);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index f263ebd..5890855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -52,8 +52,6 @@ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is
@@ -210,7 +208,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
public boolean isAlive = false;
public boolean needKeyUpdate = false;
-
+ private boolean forceRegistration = false;
+
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
// "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -279,7 +278,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
return storageMap.get(storageID);
}
}
- DatanodeStorageInfo[] getStorageInfos() {
+ @VisibleForTesting
+ public DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]);
@@ -826,6 +826,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
storage.setBlockReportCount(0);
}
heartbeatedSinceRegistration = false;
+ forceRegistration = false;
}
/**
@@ -909,6 +910,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
return false;
}
return true;
- }
+ }
+
+ public void setForceRegistration(boolean force) {
+ forceRegistration = force;
+ }
+
+ public boolean isRegistered() {
+ return isAlive && !forceRegistration;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 9caca16..92e87b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1361,7 +1361,7 @@ public class DatanodeManager {
throw new DisallowedDatanodeException(nodeinfo);
}
- if (nodeinfo == null || !nodeinfo.isAlive) {
+ if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 3adca32..b3e45f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -144,7 +144,7 @@ public class DatanodeStorageInfo {
this.state = s.getState();
}
- int getBlockReportCount() {
+ public int getBlockReportCount() {
return blockReportCount;
}
@@ -202,7 +202,7 @@ public class DatanodeStorageInfo {
return getState() == State.FAILED && numBlocks != 0;
}
- String getStorageID() {
+ public String getStorageID() {
return storageID;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index e5270ad..e19c83d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -916,7 +916,7 @@ public final class CacheManager {
try {
final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID);
- if (datanode == null || !datanode.isAlive) {
+ if (datanode == null || !datanode.isRegistered()) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " +
datanode);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index dfd51f8..582e492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -34,6 +34,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
@@ -1300,9 +1301,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
- public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
- String poolId, StorageBlockReport[] reports,
- BlockReportContext context) throws IOException {
+ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
+ String poolId, final StorageBlockReport[] reports,
+ final BlockReportContext context) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) {
@@ -1318,8 +1319,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
- noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
- blocks, context, (r == reports.length - 1));
+ final int index = r;
+ noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws IOException {
+ return bm.processReport(nodeReg, reports[index].getStorage(),
+ blocks, context, (index == reports.length - 1));
+ }
+ });
metrics.incrStorageBlockReportOps();
}
@@ -1347,8 +1354,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
- public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
- StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
+ public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
+ String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
+ throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
metrics.incrBlockReceivedAndDeletedOps();
@@ -1357,8 +1365,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
- for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
- namesystem.processIncrementalBlockReport(nodeReg, r);
+ final BlockManager bm = namesystem.getBlockManager();
+ for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
+ bm.enqueueBlockOp(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ namesystem.processIncrementalBlockReport(nodeReg, r);
+ } catch (Exception ex) {
+ // usually because the node is unregistered/dead. next heartbeat
+ // will correct the problem
+ blockStateChangeLog.error(
+ "*BLOCK* NameNode.blockReceivedAndDeleted: "
+ + "failed from " + nodeReg + ": " + ex.getMessage());
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 31bc164..54b5c6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -76,6 +76,10 @@ public class NameNodeMetrics {
MutableCounterLong blockReceivedAndDeletedOps;
@Metric("Number of blockReports from individual storages")
MutableCounterLong storageBlockReportOps;
+ @Metric("Number of blockReports and blockReceivedAndDeleted queued")
+ MutableGaugeInt blockOpsQueued;
+ @Metric("Number of blockReports and blockReceivedAndDeleted batch processed")
+ MutableCounterLong blockOpsBatched;
@Metric("Number of file system operations")
public long totalFileOps(){
@@ -267,6 +271,14 @@ public class NameNodeMetrics {
storageBlockReportOps.incr();
}
+ public void setBlockOpsQueued(int size) {
+ blockOpsQueued.set(size);
+ }
+
+ public void addBlockOpsBatched(int count) {
+ blockOpsBatched.incr(count);
+ }
+
public void addTransaction(long latency) {
transactions.add(latency);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
index 0e5d974..9d50c7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
@@ -20,22 +20,32 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.Test;
-import java.net.InetAddress;
+import com.google.common.base.Supplier;
+
import java.net.InetSocketAddress;
import java.security.Permission;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn;
@@ -309,4 +319,131 @@ public class TestDatanodeRegistration {
}
}
}
-}
+
+ // IBRs are async operations to free up IPC handlers. This means the IBR
+ // response will not contain non-IPC level exceptions - which in practice
+ // should not occur other than dead/unregistered node which will trigger a
+ // re-registration. If a non-IPC exception does occur, the safety net is
+ // a forced re-registration on the next heartbeat.
+ @Test(timeout=10000)
+ public void testForcedRegistration() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE);
+
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ cluster.getHttpUri(0);
+ FSNamesystem fsn = cluster.getNamesystem();
+ String bpId = fsn.getBlockPoolId();
+
+ DataNode dn = cluster.getDataNodes().get(0);
+ DatanodeDescriptor dnd =
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+
+ // registration should not change after heartbeat.
+ assertTrue(dnd.isRegistered());
+ DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
+ waitForHeartbeat(dn, dnd);
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+ // force a re-registration on next heartbeat.
+ dnd.setForceRegistration(true);
+ assertFalse(dnd.isRegistered());
+ waitForHeartbeat(dn, dnd);
+ assertTrue(dnd.isRegistered());
+ DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
+ assertNotSame(lastReg, newReg);
+ lastReg = newReg;
+
+ // registration should not change on subsequent heartbeats.
+ waitForHeartbeat(dn, dnd);
+ assertTrue(dnd.isRegistered());
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+ assertTrue(waitForBlockReport(dn, dnd));
+ assertTrue(dnd.isRegistered());
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+ // check that block report is not processed and registration didn't change.
+ dnd.setForceRegistration(true);
+ assertFalse(waitForBlockReport(dn, dnd));
+ assertFalse(dnd.isRegistered());
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+ // heartbeat should trigger re-registration, and next block report should
+ // not change registration.
+ waitForHeartbeat(dn, dnd);
+ assertTrue(dnd.isRegistered());
+ newReg = dn.getDNRegistrationForBP(bpId);
+ assertNotSame(lastReg, newReg);
+ lastReg = newReg;
+ assertTrue(waitForBlockReport(dn, dnd));
+ assertTrue(dnd.isRegistered());
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+ // registration doesn't change.
+ ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
+ dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
+ DataNodeTestUtils.triggerDeletionReport(dn);
+ assertTrue(dnd.isRegistered());
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+ // a failed IBR will effectively unregister the node.
+ boolean failed = false;
+ try {
+ // pass null to cause a failure since there aren't any easy failure
+ // modes since it shouldn't happen.
+ fsn.processIncrementalBlockReport(lastReg, null);
+ } catch (NullPointerException npe) {
+ failed = true;
+ }
+ assertTrue("didn't fail", failed);
+ assertFalse(dnd.isRegistered());
+
+ // should remain unregistered until next heartbeat.
+ dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
+ DataNodeTestUtils.triggerDeletionReport(dn);
+ assertFalse(dnd.isRegistered());
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+ waitForHeartbeat(dn, dnd);
+ assertTrue(dnd.isRegistered());
+ assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
+ }
+
+ private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd)
+ throws Exception {
+ final long lastUpdate = dnd.getLastUpdateMonotonic();
+ Thread.sleep(1);
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+ DataNodeTestUtils.triggerHeartbeat(dn);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return lastUpdate != dnd.getLastUpdateMonotonic();
+ }
+ }, 10, 100000);
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ }
+
+ private boolean waitForBlockReport(final DataNode dn,
+ final DatanodeDescriptor dnd) throws Exception {
+ final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+ final long lastCount = storage.getBlockReportCount();
+ dn.triggerBlockReport(
+ new BlockReportOptions.Factory().setIncremental(false).build());
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return lastCount != storage.getBlockReportCount();
+ }
+ }, 10, 100);
+ } catch (TimeoutException te) {
+ return false;
+ }
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 196a38b..5f08886 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -34,10 +35,22 @@ import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
@@ -66,9 +79,13 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology;
import org.junit.Assert;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -900,4 +917,158 @@ public class TestBlockManager {
assertFalse("Replicas for block is not stored on enough racks",
bm.isPlacementPolicySatisfied(blockInfo));
}
-}
+
+ @Test
+ public void testBlockReportQueueing() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ try {
+ cluster.waitActive();
+ final FSNamesystem fsn = cluster.getNamesystem();
+ final BlockManager bm = fsn.getBlockManager();
+ final ExecutorService executor = Executors.newCachedThreadPool();
+
+ final CyclicBarrier startBarrier = new CyclicBarrier(2);
+ final CountDownLatch endLatch = new CountDownLatch(3);
+
+ // create a task intended to block while processing, thus causing
+ // the queue to backup. simulates how a full BR is processed.
+ FutureTask<?> blockingOp = new FutureTask<Void>(
+ new Callable<Void>(){
+ @Override
+ public Void call() throws IOException {
+ return bm.runBlockOp(new Callable<Void>() {
+ @Override
+ public Void call()
+ throws InterruptedException, BrokenBarrierException {
+ // use a barrier to control the blocking.
+ startBarrier.await();
+ endLatch.countDown();
+ return null;
+ }
+ });
+ }
+ });
+
+ // create an async task. simulates how an IBR is processed.
+ Callable<?> asyncOp = new Callable<Void>(){
+ @Override
+ public Void call() throws IOException {
+ bm.enqueueBlockOp(new Runnable() {
+ @Override
+ public void run() {
+ // use the latch to signal if the op has run.
+ endLatch.countDown();
+ }
+ });
+ return null;
+ }
+ };
+
+ // calling get forces its execution so we can test if it's blocked.
+ Future<?> blockedFuture = executor.submit(blockingOp);
+ boolean isBlocked = false;
+ try {
+ // wait 1s for the future to block. it should run instantaneously.
+ blockedFuture.get(1, TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ isBlocked = true;
+ }
+ assertTrue(isBlocked);
+
+ // should effectively return immediately since calls are queued.
+ // however they should be backed up in the queue behind the blocking
+ // operation.
+ executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
+ executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
+
+ // check the async calls are queued, and first is still blocked.
+ assertEquals(2, bm.getBlockOpQueueLength());
+ assertFalse(blockedFuture.isDone());
+
+ // unblock the queue, wait for last op to complete, check the blocked
+ // call has returned
+ startBarrier.await(1, TimeUnit.SECONDS);
+ assertTrue(endLatch.await(1, TimeUnit.SECONDS));
+ assertEquals(0, bm.getBlockOpQueueLength());
+ assertTrue(blockingOp.isDone());
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ // spam the block manager with IBRs to verify queuing is occurring.
+ @Test
+ public void testAsyncIBR() throws Exception {
+ Logger.getRootLogger().setLevel(Level.WARN);
+
+ // will create files with many small blocks.
+ final int blkSize = 4*1024;
+ final int fileSize = blkSize * 100;
+ final byte[] buf = new byte[2*blkSize];
+ final int numWriters = 4;
+ final int repl = 3;
+
+ final CyclicBarrier barrier = new CyclicBarrier(numWriters);
+ final CountDownLatch writeLatch = new CountDownLatch(numWriters);
+ final AtomicBoolean failure = new AtomicBoolean();
+
+ final Configuration conf = new HdfsConfiguration();
+ conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize);
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
+
+ try {
+ cluster.waitActive();
+ // create multiple writer threads to create a file with many blocks.
+ // will test that concurrent writing causes IBR batching in the NN
+ Thread[] writers = new Thread[numWriters];
+ for (int i=0; i < writers.length; i++) {
+ final Path p = new Path("/writer"+i);
+ writers[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ FSDataOutputStream os =
+ fs.create(p, true, buf.length, (short)repl, blkSize);
+ // align writers for maximum chance of IBR batching.
+ barrier.await();
+ int remaining = fileSize;
+ while (remaining > 0) {
+ os.write(buf);
+ remaining -= buf.length;
+ }
+ os.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ failure.set(true);
+ }
+ // let main thread know we are done.
+ writeLatch.countDown();
+ }
+ });
+ writers[i].start();
+ }
+
+ // when and how many IBRs are queued is indeterminate, so just watch
+ // the metrics and verify something was queued at during execution.
+ boolean sawQueued = false;
+ while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) {
+ assertFalse(failure.get());
+ MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
+ long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb);
+ sawQueued |= (queued > 0);
+ }
+ assertFalse(failure.get());
+ assertTrue(sawQueued);
+
+ // verify that batching of the IBRs occurred.
+ MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
+ long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
+ assertTrue(batched > 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
index 698a38e..06917a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
@@ -298,7 +298,8 @@ public class TestPendingReplication {
reportDnNum++;
}
}
-
+ // IBRs are async, make sure the NN processes all of them.
+ cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
@@ -316,6 +317,7 @@ public class TestPendingReplication {
}
}
+ cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index c4a2d06..0a57005 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -113,9 +113,13 @@ public abstract class BlockReportTestBase {
@After
public void shutDownCluster() throws IOException {
- fs.close();
- cluster.shutdownDataNodes();
- cluster.shutdown();
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
+ }
}
protected static void resetConfiguration() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
index 4e73e6e..79f87c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -182,8 +181,11 @@ public class TestIncrementalBrVariations {
}
// Make sure that the deleted block from each storage was picked up
- // by the NameNode.
- assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length));
+ // by the NameNode. IBRs are async, make sure the NN processes
+ // all of them.
+ cluster.getNamesystem().getBlockManager().flushBlockOps();
+ assertThat(cluster.getNamesystem().getMissingBlocksCount(),
+ is((long) reports.length));
}
/**
@@ -249,7 +251,8 @@ public class TestIncrementalBrVariations {
// Send the report to the NN.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
-
+ // IBRs are async, make sure the NN processes all of them.
+ cluster.getNamesystem().getBlockManager().flushBlockOps();
// Make sure that the NN has learned of the new storage.
DatanodeStorageInfo storageInfo = cluster.getNameNode()
.getNamesystem()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa300ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 92c329e..6900230 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -94,14 +96,14 @@ public class TestDeadDatanode {
null) };
StorageReceivedDeletedBlocks[] storageBlocks = {
new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
-
- // Ensure blockReceived call from dead datanode is rejected with IOException
- try {
- dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
- fail("Expected IOException is not thrown");
- } catch (IOException ex) {
- // Expected
- }
+
+ // Ensure blockReceived call from dead datanode is not rejected with
+ // IOException, since it's async, but the node remains unregistered.
+ dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
+ BlockManager bm = cluster.getNamesystem().getBlockManager();
+ // IBRs are async, make sure the NN processes all of them.
+ bm.flushBlockOps();
+ assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
// Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport(