You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2015/12/17 04:29:50 UTC
hadoop git commit: Revert this commit as there is compilation issue
with this patch in branch-2 "HDFS-9198. Coalesce IBR processing in the NN.
(Daryn Sharp via umamahesh)"
Repository: hadoop
Updated Branches:
refs/heads/branch-2 991ce3d63 -> e82135df8
Revert this commit as there is compilation issue with this patch in branch-2 "HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)"
This reverts commit 991ce3d6300fe742862f07397f5474b1ed8eb9a4.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e82135df
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e82135df
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e82135df
Branch: refs/heads/branch-2
Commit: e82135df8752d78050bae72324090229bd7f4d56
Parents: 991ce3d
Author: Uma Mahesh <um...@apache.org>
Authored: Wed Dec 16 19:18:30 2015 -0800
Committer: Uma Mahesh <um...@apache.org>
Committed: Wed Dec 16 19:18:30 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 -
.../server/blockmanagement/BlockManager.java | 154 +----------------
.../blockmanagement/DatanodeDescriptor.java | 14 +-
.../server/blockmanagement/DatanodeManager.java | 2 +-
.../blockmanagement/DatanodeStorageInfo.java | 2 +-
.../hdfs/server/namenode/CacheManager.java | 2 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 40 +----
.../namenode/metrics/NameNodeMetrics.java | 12 --
.../hadoop/hdfs/TestDatanodeRegistration.java | 140 +--------------
.../blockmanagement/TestBlockManager.java | 173 +------------------
.../blockmanagement/TestPendingReplication.java | 4 +-
.../server/datanode/BlockReportTestBase.java | 10 +-
.../datanode/TestIncrementalBrVariations.java | 8 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 16 +-
14 files changed, 39 insertions(+), 540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b6a258e..0263f17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1637,8 +1637,6 @@ Release 2.8.0 - UNRELEASED
HDFS-9430. Remove waitForLoadingFSImage since checkNNStartup has ensured
image loaded and namenode started. (Brahma Reddy Battula via mingma)
- HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)
-
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1b331ba..4e36c0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -34,11 +34,6 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@@ -96,7 +91,6 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
@@ -198,10 +192,6 @@ public class BlockManager implements BlockStatsMXBean {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
- /** Block report thread for handling async reports. */
- private final BlockReportProcessingThread blockReportThread =
- new BlockReportProcessingThread();
-
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@@ -493,7 +483,6 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
- this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
}
@@ -502,9 +491,7 @@ public class BlockManager implements BlockStatsMXBean {
bmSafeMode.close();
try {
replicationThread.interrupt();
- blockReportThread.interrupt();
replicationThread.join(3000);
- blockReportThread.join(3000);
} catch (InterruptedException ie) {
}
datanodeManager.close();
@@ -1890,7 +1877,7 @@ public class BlockManager implements BlockStatsMXBean {
try {
node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isRegistered()) {
+ if (node == null || !node.isAlive()) {
throw new IOException(
"ProcessReport from dead or unregistered node: " + nodeID);
}
@@ -3242,23 +3229,17 @@ public class BlockManager implements BlockStatsMXBean {
public void processIncrementalBlockReport(final DatanodeID nodeID,
final StorageReceivedDeletedBlocks srdb) throws IOException {
assert namesystem.hasWriteLock();
+ int received = 0;
+ int deleted = 0;
+ int receiving = 0;
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isRegistered()) {
+ if (node == null || !node.isAlive()) {
blockLog.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node {}", nodeID);
throw new IOException(
"Got incremental block report from unregistered or dead node");
}
- try {
- processIncrementalBlockReport(node, srdb);
- } catch (Exception ex) {
- node.setForceRegistration(true);
- throw ex;
- }
- }
- private void processIncrementalBlockReport(final DatanodeDescriptor node,
- final StorageReceivedDeletedBlocks srdb) throws IOException {
DatanodeStorageInfo storageInfo =
node.getStorageInfo(srdb.getStorage().getStorageID());
if (storageInfo == null) {
@@ -3270,10 +3251,6 @@ public class BlockManager implements BlockStatsMXBean {
storageInfo = node.updateStorage(srdb.getStorage());
}
- int received = 0;
- int deleted = 0;
- int receiving = 0;
-
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
@@ -3291,17 +3268,17 @@ public class BlockManager implements BlockStatsMXBean {
break;
default:
String msg =
- "Unknown block status code reported by " + node +
+ "Unknown block status code reported by " + nodeID +
": " + rdbi;
blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
blockLog.debug("BLOCK* block {}: {} is received from {}",
- rdbi.getStatus(), rdbi.getBlock(), node);
+ rdbi.getStatus(), rdbi.getBlock(), nodeID);
}
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
- + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
+ + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
received, deleted);
}
@@ -3902,119 +3879,4 @@ public class BlockManager implements BlockStatsMXBean {
return false;
}
- // async processing of an action, used for IBRs.
- public void enqueueBlockOp(final Runnable action) throws IOException {
- try {
- blockReportThread.enqueue(action);
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- }
- }
-
- // sync batch processing for a full BR.
- public <T> T runBlockOp(final Callable<T> action)
- throws IOException {
- final FutureTask<T> future = new FutureTask<T>(action);
- enqueueBlockOp(future);
- try {
- return future.get();
- } catch (ExecutionException ee) {
- Throwable cause = ee.getCause();
- if (cause == null) {
- cause = ee;
- }
- if (!(cause instanceof IOException)) {
- cause = new IOException(cause);
- }
- throw (IOException)cause;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException(ie);
- }
- }
-
- @VisibleForTesting
- public void flushBlockOps() throws IOException {
- runBlockOp(new Callable<Void>(){
- @Override
- public Void call() {
- return null;
- }
- });
- }
-
- public int getBlockOpQueueLength() {
- return blockReportThread.queue.size();
- }
-
- private class BlockReportProcessingThread extends Thread {
- private static final long MAX_LOCK_HOLD_MS = 4;
- private long lastFull = 0;
-
- private final BlockingQueue<Runnable> queue =
- new ArrayBlockingQueue<Runnable>(1024);
-
- BlockReportProcessingThread() {
- super("Block report processor");
- setDaemon(true);
- }
-
- @Override
- public void run() {
- try {
- processQueue();
- } catch (Throwable t) {
- ExitUtil.terminate(1,
- getName() + " encountered fatal exception: " + t);
- }
- }
-
- private void processQueue() {
- while (namesystem.isRunning()) {
- NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
- try {
- Runnable action = queue.take();
- // batch as many operations in the write lock until the queue
- // runs dry, or the max lock hold is reached.
- int processed = 0;
- namesystem.writeLock();
- metrics.setBlockOpsQueued(queue.size() + 1);
- try {
- long start = Time.monotonicNow();
- do {
- processed++;
- action.run();
- if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
- break;
- }
- action = queue.poll();
- } while (action != null);
- } finally {
- namesystem.writeUnlock();
- metrics.addBlockOpsBatched(processed - 1);
- }
- } catch (InterruptedException e) {
- // ignore unless thread was specifically interrupted.
- if (Thread.interrupted()) {
- break;
- }
- }
- }
- queue.clear();
- }
-
- void enqueue(Runnable action) throws InterruptedException {
- if (!queue.offer(action)) {
- if (!isAlive() && namesystem.isRunning()) {
- ExitUtil.terminate(1, getName()+" is not running");
- }
- long now = Time.monotonicNow();
- if (now - lastFull > 4000) {
- lastFull = now;
- LOG.info("Block report queue is full");
- }
- queue.put(action);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 5b9b73e..fbace92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -189,8 +189,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
// This is an optimization, because contains takes O(n) time on Arraylist
private boolean isAlive = false;
private boolean needKeyUpdate = false;
- private boolean forceRegistration = false;
-
+
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
// "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -825,7 +824,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
storage.setBlockReportCount(0);
}
heartbeatedSinceRegistration = false;
- forceRegistration = false;
}
/**
@@ -908,14 +906,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
return false;
}
return true;
- }
-
- public void setForceRegistration(boolean force) {
- forceRegistration = force;
- }
-
- public boolean isRegistered() {
- return isAlive() && !forceRegistration;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 802bb76..0828d91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1412,7 +1412,7 @@ public class DatanodeManager {
throw new DisallowedDatanodeException(nodeinfo);
}
- if (nodeinfo == null || !nodeinfo.isRegistered()) {
+ if (nodeinfo == null || !nodeinfo.isAlive()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index d4658e5..216d6d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -151,7 +151,7 @@ public class DatanodeStorageInfo {
this.state = s.getState();
}
- public int getBlockReportCount() {
+ int getBlockReportCount() {
return blockReportCount;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index b1f936b..4fd9ca8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -934,7 +934,7 @@ public final class CacheManager {
try {
final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID);
- if (datanode == null || !datanode.isRegistered()) {
+ if (datanode == null || !datanode.isAlive()) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " +
datanode);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7a5e99b..a1fed93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -36,7 +36,6 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
@@ -1359,9 +1358,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
- public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
- String poolId, final StorageBlockReport[] reports,
- final BlockReportContext context) throws IOException {
+ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
+ String poolId, StorageBlockReport[] reports,
+ BlockReportContext context) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) {
@@ -1377,14 +1376,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
- final int index = r;
- noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
- @Override
- public Boolean call() throws IOException {
- return bm.processReport(nodeReg, reports[index].getStorage(),
- blocks, context, (index == reports.length - 1));
- }
- });
+ noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
+ blocks, context, (r == reports.length - 1));
metrics.incrStorageBlockReportOps();
}
BlockManagerFaultInjector.getInstance().
@@ -1414,9 +1407,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
- public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
- String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
- throws IOException {
+ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
+ StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
metrics.incrBlockReceivedAndDeletedOps();
@@ -1425,22 +1417,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
- final BlockManager bm = namesystem.getBlockManager();
- for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
- bm.enqueueBlockOp(new Runnable() {
- @Override
- public void run() {
- try {
- namesystem.processIncrementalBlockReport(nodeReg, r);
- } catch (Exception ex) {
- // usually because the node is unregistered/dead. next heartbeat
- // will correct the problem
- blockStateChangeLog.error(
- "*BLOCK* NameNode.blockReceivedAndDeleted: "
- + "failed from " + nodeReg + ": " + ex.getMessage());
- }
- }
- });
+ for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
+ namesystem.processIncrementalBlockReport(nodeReg, r);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 54b5c6e..31bc164 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -76,10 +76,6 @@ public class NameNodeMetrics {
MutableCounterLong blockReceivedAndDeletedOps;
@Metric("Number of blockReports from individual storages")
MutableCounterLong storageBlockReportOps;
- @Metric("Number of blockReports and blockReceivedAndDeleted queued")
- MutableGaugeInt blockOpsQueued;
- @Metric("Number of blockReports and blockReceivedAndDeleted batch processed")
- MutableCounterLong blockOpsBatched;
@Metric("Number of file system operations")
public long totalFileOps(){
@@ -271,14 +267,6 @@ public class NameNodeMetrics {
storageBlockReportOps.incr();
}
- public void setBlockOpsQueued(int size) {
- blockOpsQueued.set(size);
- }
-
- public void addBlockOpsBatched(int count) {
- blockOpsBatched.incr(count);
- }
-
public void addTransaction(long latency) {
transactions.add(latency);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
index ccac99f..ee9fa4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
@@ -20,32 +20,21 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.Test;
-import com.google.common.base.Supplier;
-
import java.net.InetSocketAddress;
import java.security.Permission;
-import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn;
@@ -319,131 +308,4 @@ public class TestDatanodeRegistration {
}
}
}
-
- // IBRs are async operations to free up IPC handlers. This means the IBR
- // response will not contain non-IPC level exceptions - which in practice
- // should not occur other than dead/unregistered node which will trigger a
- // re-registration. If a non-IPC exception does occur, the safety net is
- // a forced re-registration on the next heartbeat.
- @Test(timeout=10000)
- public void testForcedRegistration() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE);
-
- final MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
- cluster.waitActive();
- cluster.getHttpUri(0);
- FSNamesystem fsn = cluster.getNamesystem();
- String bpId = fsn.getBlockPoolId();
-
- DataNode dn = cluster.getDataNodes().get(0);
- DatanodeDescriptor dnd =
- NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
- DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
- DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
-
- // registration should not change after heartbeat.
- assertTrue(dnd.isRegistered());
- DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
- waitForHeartbeat(dn, dnd);
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
-
- // force a re-registration on next heartbeat.
- dnd.setForceRegistration(true);
- assertFalse(dnd.isRegistered());
- waitForHeartbeat(dn, dnd);
- assertTrue(dnd.isRegistered());
- DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
- assertNotSame(lastReg, newReg);
- lastReg = newReg;
-
- // registration should not change on subsequent heartbeats.
- waitForHeartbeat(dn, dnd);
- assertTrue(dnd.isRegistered());
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
- assertTrue(waitForBlockReport(dn, dnd));
- assertTrue(dnd.isRegistered());
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
-
- // check that block report is not processed and registration didn't change.
- dnd.setForceRegistration(true);
- assertFalse(waitForBlockReport(dn, dnd));
- assertFalse(dnd.isRegistered());
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
-
- // heartbeat should trigger re-registration, and next block report should
- // not change registration.
- waitForHeartbeat(dn, dnd);
- assertTrue(dnd.isRegistered());
- newReg = dn.getDNRegistrationForBP(bpId);
- assertNotSame(lastReg, newReg);
- lastReg = newReg;
- assertTrue(waitForBlockReport(dn, dnd));
- assertTrue(dnd.isRegistered());
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
-
- // registration doesn't change.
- ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
- dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
- DataNodeTestUtils.triggerDeletionReport(dn);
- assertTrue(dnd.isRegistered());
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
-
- // a failed IBR will effectively unregister the node.
- boolean failed = false;
- try {
- // pass null to cause a failure since there aren't any easy failure
- // modes since it shouldn't happen.
- fsn.processIncrementalBlockReport(lastReg, null);
- } catch (NullPointerException npe) {
- failed = true;
- }
- assertTrue("didn't fail", failed);
- assertFalse(dnd.isRegistered());
-
- // should remain unregistered until next heartbeat.
- dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
- DataNodeTestUtils.triggerDeletionReport(dn);
- assertFalse(dnd.isRegistered());
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
- waitForHeartbeat(dn, dnd);
- assertTrue(dnd.isRegistered());
- assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
- }
-
- private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd)
- throws Exception {
- final long lastUpdate = dnd.getLastUpdateMonotonic();
- Thread.sleep(1);
- DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
- DataNodeTestUtils.triggerHeartbeat(dn);
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return lastUpdate != dnd.getLastUpdateMonotonic();
- }
- }, 10, 100000);
- DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
- }
-
- private boolean waitForBlockReport(final DataNode dn,
- final DatanodeDescriptor dnd) throws Exception {
- final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
- final long lastCount = storage.getBlockReportCount();
- dn.triggerBlockReport(
- new BlockReportOptions.Factory().setIncremental(false).build());
- try {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return lastCount != storage.getBlockReportCount();
- }
- }, 10, 100);
- } catch (TimeoutException te) {
- return false;
- }
- return true;
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 1cebac1..5c874ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -35,20 +34,8 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
@@ -73,12 +60,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.MetricsAsserts;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -848,158 +831,4 @@ public class TestBlockManager {
Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
null, excessTypes));
}
-
- @Test
- public void testBlockReportQueueing() throws Exception {
- Configuration conf = new HdfsConfiguration();
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
- try {
- cluster.waitActive();
- final FSNamesystem fsn = cluster.getNamesystem();
- final BlockManager bm = fsn.getBlockManager();
- final ExecutorService executor = Executors.newCachedThreadPool();
-
- final CyclicBarrier startBarrier = new CyclicBarrier(2);
- final CountDownLatch endLatch = new CountDownLatch(3);
-
- // create a task intended to block while processing, thus causing
- // the queue to backup. simulates how a full BR is processed.
- FutureTask<?> blockingOp = new FutureTask<Void>(
- new Callable<Void>(){
- @Override
- public Void call() throws IOException {
- return bm.runBlockOp(new Callable<Void>() {
- @Override
- public Void call()
- throws InterruptedException, BrokenBarrierException {
- // use a barrier to control the blocking.
- startBarrier.await();
- endLatch.countDown();
- return null;
- }
- });
- }
- });
-
- // create an async task. simulates how an IBR is processed.
- Callable<?> asyncOp = new Callable<Void>(){
- @Override
- public Void call() throws IOException {
- bm.enqueueBlockOp(new Runnable() {
- @Override
- public void run() {
- // use the latch to signal if the op has run.
- endLatch.countDown();
- }
- });
- return null;
- }
- };
-
- // calling get forces its execution so we can test if it's blocked.
- Future<?> blockedFuture = executor.submit(blockingOp);
- boolean isBlocked = false;
- try {
- // wait 1s for the future to block. it should run instantaneously.
- blockedFuture.get(1, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- isBlocked = true;
- }
- assertTrue(isBlocked);
-
- // should effectively return immediately since calls are queued.
- // however they should be backed up in the queue behind the blocking
- // operation.
- executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
- executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
-
- // check the async calls are queued, and first is still blocked.
- assertEquals(2, bm.getBlockOpQueueLength());
- assertFalse(blockedFuture.isDone());
-
- // unblock the queue, wait for last op to complete, check the blocked
- // call has returned
- startBarrier.await(1, TimeUnit.SECONDS);
- assertTrue(endLatch.await(1, TimeUnit.SECONDS));
- assertEquals(0, bm.getBlockOpQueueLength());
- assertTrue(blockingOp.isDone());
- } finally {
- cluster.shutdown();
- }
- }
-
- // spam the block manager with IBRs to verify queuing is occurring.
- @Test
- public void testAsyncIBR() throws Exception {
- Logger.getRootLogger().setLevel(Level.WARN);
-
- // will create files with many small blocks.
- final int blkSize = 4*1024;
- final int fileSize = blkSize * 100;
- final byte[] buf = new byte[2*blkSize];
- final int numWriters = 4;
- final int repl = 3;
-
- final CyclicBarrier barrier = new CyclicBarrier(numWriters);
- final CountDownLatch writeLatch = new CountDownLatch(numWriters);
- final AtomicBoolean failure = new AtomicBoolean();
-
- final Configuration conf = new HdfsConfiguration();
- conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize);
- final MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
-
- try {
- cluster.waitActive();
- // create multiple writer threads to create a file with many blocks.
- // will test that concurrent writing causes IBR batching in the NN
- Thread[] writers = new Thread[numWriters];
- for (int i=0; i < writers.length; i++) {
- final Path p = new Path("/writer"+i);
- writers[i] = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- FileSystem fs = cluster.getFileSystem();
- FSDataOutputStream os =
- fs.create(p, true, buf.length, (short)repl, blkSize);
- // align writers for maximum chance of IBR batching.
- barrier.await();
- int remaining = fileSize;
- while (remaining > 0) {
- os.write(buf);
- remaining -= buf.length;
- }
- os.close();
- } catch (Exception e) {
- e.printStackTrace();
- failure.set(true);
- }
- // let main thread know we are done.
- writeLatch.countDown();
- }
- });
- writers[i].start();
- }
-
- // when and how many IBRs are queued is indeterminate, so just watch
- // the metrics and verify something was queued at during execution.
- boolean sawQueued = false;
- while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) {
- assertFalse(failure.get());
- MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
- long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb);
- sawQueued |= (queued > 0);
- }
- assertFalse(failure.get());
- assertTrue(sawQueued);
-
- // verify that batching of the IBRs occurred.
- MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
- long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
- assertTrue(batched > 0);
- } finally {
- cluster.shutdown();
- }
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
index b5b0cf2..3d399a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
@@ -304,8 +304,7 @@ public class TestPendingReplication {
reportDnNum++;
}
}
- // IBRs are async, make sure the NN processes all of them.
- cluster.getNamesystem().getBlockManager().flushBlockOps();
+
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
@@ -323,7 +322,6 @@ public class TestPendingReplication {
}
}
- cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 0a57005..c4a2d06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -113,13 +113,9 @@ public abstract class BlockReportTestBase {
@After
public void shutDownCluster() throws IOException {
- if (fs != null) {
- fs.close();
- }
- if (cluster != null) {
- cluster.shutdownDataNodes();
- cluster.shutdown();
- }
+ fs.close();
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
}
protected static void resetConfiguration() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
index 0801701..989e216 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -186,9 +187,7 @@ public class TestIncrementalBrVariations {
}
// Make sure that the deleted block from each storage was picked up
- // by the NameNode. IBRs are async, make sure the NN processes
- // all of them.
- cluster.getNamesystem().getBlockManager().flushBlockOps();
+ // by the NameNode.
assertThat(cluster.getNamesystem().getMissingBlocksCount(),
is((long) reports.length));
}
@@ -257,8 +256,7 @@ public class TestIncrementalBrVariations {
// Send the report to the NN.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
- // IBRs are async, make sure the NN processes all of them.
- cluster.getNamesystem().getBlockManager().flushBlockOps();
+
// Make sure that the NN has learned of the new storage.
DatanodeStorageInfo storageInfo = cluster.getNameNode()
.getNamesystem()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e82135df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index bfd026c..c5262d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -100,14 +100,14 @@ public class TestDeadDatanode {
null) };
StorageReceivedDeletedBlocks[] storageBlocks = {
new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
-
- // Ensure blockReceived call from dead datanode is not rejected with
- // IOException, since it's async, but the node remains unregistered.
- dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
- BlockManager bm = cluster.getNamesystem().getBlockManager();
- // IBRs are async, make sure the NN processes all of them.
- bm.flushBlockOps();
- assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
+
+ // Ensure blockReceived call from dead datanode is rejected with IOException
+ try {
+ dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
+ fail("Expected IOException is not thrown");
+ } catch (IOException ex) {
+ // Expected
+ }
// Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport(