You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/08/05 00:55:54 UTC
svn commit: r1154042 - in /hadoop/common/trunk/hdfs: ./
src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hd...
Author: szetszwo
Date: Thu Aug 4 22:55:48 2011
New Revision: 1154042
URL: http://svn.apache.org/viewvc?rev=1154042&view=rev
Log:
HDFS-2108. Move datanode heartbeat handling from namenode package to blockmanagement package.
Added:
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
- copied, changed from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
- copied, changed from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
- copied, changed from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
Removed:
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Aug 4 22:55:48 2011
@@ -635,6 +635,9 @@ Trunk (unreleased changes)
HDFS-2225. Refactor file management so it's not in classes which should
be generic. (Ivan Kelly via todd)
+ HDFS-2108. Move datanode heartbeat handling from namenode package to
+ blockmanagement package. (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Aug 4 22:55:48 2011
@@ -565,7 +565,6 @@ public interface ClientProtocol extends
* <li> [3] contains number of under replicated blocks in the system.</li>
* <li> [4] contains number of blocks with a corrupt replica. </li>
* <li> [5] contains number of blocks without any good replicas left. </li>
- * <li> [5] contains number of blocks without any good replicas left. </li>
* <li> [6] contains the total used space of the block pool. </li>
* </ul>
* Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Aug 4 22:55:48 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -46,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -95,11 +94,6 @@ public class BlockManager {
return isBlockTokenEnabled;
}
- /** get the block key update interval */
- public long getBlockKeyUpdateInterval() {
- return blockKeyUpdateInterval;
- }
-
/** get the BlockTokenSecretManager */
public BlockTokenSecretManager getBlockTokenSecretManager() {
return blockTokenSecretManager;
@@ -140,7 +134,8 @@ public class BlockManager {
public final BlocksMap blocksMap;
private final DatanodeManager datanodeManager;
-
+ private final HeartbeatManager heartbeatManager;
+
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
@@ -177,7 +172,7 @@ public class BlockManager {
/** The maximum number of outgoing replication streams
* a given node should have at one time
*/
- public int maxReplicationStreams;
+ int maxReplicationStreams;
/** Minimum copies needed or else write is disallowed */
public final int minReplication;
/** Default number of replicas */
@@ -217,22 +212,12 @@ public class BlockManager {
setBlockToken(l);
}
}
-
- /**
- * Update access keys.
- */
- public void updateBlockKey() throws IOException {
- this.blockTokenSecretManager.updateKeys();
- synchronized (namesystem.heartbeats) {
- for (DatanodeDescriptor nodeInfo : namesystem.heartbeats) {
- nodeInfo.needKeyUpdate = true;
- }
- }
- }
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
namesystem = fsn;
- datanodeManager = new DatanodeManager(fsn, conf);
+ datanodeManager = new DatanodeManager(this, fsn, conf);
+ heartbeatManager = datanodeManager.getHeartbeatManager();
+
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance(
conf, namesystem, datanodeManager.getNetworkTopology());
@@ -387,6 +372,11 @@ public class BlockManager {
getDatanodeManager().datanodeDump(out);
}
+ /** @return maxReplicationStreams */
+ public int getMaxReplicationStreams() {
+ return maxReplicationStreams;
+ }
+
/**
* @param block
* @return true if the block has minimum replicas
@@ -587,7 +577,8 @@ public class BlockManager {
}
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
- return namesystem.createLocatedBlock(uc, locations, pos, false);
+ final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+ return new LocatedBlock(eb, locations, pos, false);
}
// get block locations
@@ -613,7 +604,8 @@ public class BlockManager {
machines[j++] = d;
}
}
- return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
+ final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+ return new LocatedBlock(eb, machines, pos, isCorrupt);
}
/**
@@ -685,8 +677,8 @@ public class BlockManager {
}
- /** Remove a datanode. */
- public void removeDatanode(final DatanodeDescriptor node) {
+ /** Remove the blocks associated to the given datanode. */
+ void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
@@ -694,11 +686,6 @@ public class BlockManager {
node.resetBlocks();
removeFromInvalidates(node.getStorageID());
- datanodeManager.getNetworkTopology().remove(node);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("remove datanode " + node.getName());
- }
}
private void removeFromInvalidates(String storageID, Block block) {
@@ -887,7 +874,7 @@ public class BlockManager {
* @param nodesToProcess number of datanodes to schedule deletion work
* @return total number of block for deletion
*/
- public int computeInvalidateWork(int nodesToProcess) {
+ int computeInvalidateWork(int nodesToProcess) {
int numOfNodes = recentInvalidateSets.size();
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
@@ -927,7 +914,7 @@ public class BlockManager {
*
* @return number of blocks scheduled for replication during this iteration.
*/
- public int computeReplicationWork(int blocksToProcess) throws IOException {
+ private int computeReplicationWork(int blocksToProcess) throws IOException {
// Choose the blocks to be replicated
List<List<Block>> blocksToReplicate =
chooseUnderReplicatedBlocks(blocksToProcess);
@@ -2047,7 +2034,7 @@ public class BlockManager {
* On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock()
*/
- private void processOverReplicatedBlocksOnReCommission(
+ void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
@@ -2145,6 +2132,16 @@ public class BlockManager {
return blocksMap.getStoredBlock(block);
}
+
+ /** Should the access keys be updated? */
+ boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
+ final boolean b = isBlockTokenEnabled && blockKeyUpdateInterval < updateTime;
+ if (b) {
+ blockTokenSecretManager.updateKeys();
+ }
+ return b;
+ }
+
/* updates a block in under replication queue */
public void updateNeededReplications(Block block, int curReplicasDelta,
int expectedReplicasDelta) {
@@ -2356,57 +2353,11 @@ public class BlockManager {
}
/**
- * Change, if appropriate, the admin state of a datanode to
- * decommission completed. Return true if decommission is complete.
- */
- boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
- // Check to see if all blocks in this decommissioned
- // node has reached their target replication factor.
- if (node.isDecommissionInProgress()) {
- if (!isReplicationInProgress(node)) {
- node.setDecommissioned();
- LOG.info("Decommission complete for node " + node.getName());
- }
- }
- return node.isDecommissioned();
- }
-
- /** Start decommissioning the specified datanode. */
- void startDecommission(DatanodeDescriptor node) throws IOException {
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- LOG.info("Start Decommissioning node " + node.getName() + " with " +
- node.numBlocks() + " blocks.");
- synchronized (namesystem.heartbeats) {
- namesystem.updateStats(node, false);
- node.startDecommission();
- namesystem.updateStats(node, true);
- }
- node.decommissioningStatus.setStartTime(now());
-
- // all the blocks that reside on this node have to be replicated.
- checkDecommissionStateInternal(node);
- }
- }
-
- /** Stop decommissioning the specified datanodes. */
- void stopDecommission(DatanodeDescriptor node) throws IOException {
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- LOG.info("Stop Decommissioning node " + node.getName());
- synchronized (namesystem.heartbeats) {
- namesystem.updateStats(node, false);
- node.stopDecommission();
- namesystem.updateStats(node, true);
- }
- processOverReplicatedBlocksOnReCommission(node);
- }
- }
-
- /**
* Periodically calls computeReplicationWork().
*/
private class ReplicationMonitor implements Runnable {
- static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
- static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+ private static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+ private static final int REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
@Override
public void run() {
@@ -2439,8 +2390,6 @@ public class BlockManager {
*/
int computeDatanodeWork() throws IOException {
int workFound = 0;
- int blocksToProcess = 0;
- int nodesToProcess = 0;
// Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get
@@ -2448,11 +2397,11 @@ public class BlockManager {
if (namesystem.isInSafeMode())
return workFound;
- synchronized (namesystem.heartbeats) {
- blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
- nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
- * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
- }
+ final int numlive = heartbeatManager.getLiveDatanodeCount();
+ final int blocksToProcess = numlive
+ * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
+ final int nodesToProcess = (int) Math.ceil(numlive
+ * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
workFound = this.computeReplicationWork(blocksToProcess);
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Thu Aug 4 22:55:48 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Aug 4 22:55:48 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -56,7 +59,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -75,7 +77,10 @@ import org.apache.hadoop.util.Reflection
public class DatanodeManager {
static final Log LOG = LogFactory.getLog(DatanodeManager.class);
- final FSNamesystem namesystem;
+ private final FSNamesystem namesystem;
+ private final BlockManager blockManager;
+
+ private final HeartbeatManager heartbeatManager;
/**
* Stores the datanode -> block map.
@@ -117,9 +122,14 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
- DatanodeManager(final FSNamesystem namesystem, final Configuration conf
+ DatanodeManager(final BlockManager blockManager,
+ final FSNamesystem namesystem, final Configuration conf
) throws IOException {
this.namesystem = namesystem;
+ this.blockManager = blockManager;
+
+ this.heartbeatManager = new HeartbeatManager(namesystem, conf);
+
this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
@@ -158,17 +168,30 @@ public class DatanodeManager {
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
decommissionthread.start();
+
+ heartbeatManager.activate(conf);
}
void close() {
if (decommissionthread != null) decommissionthread.interrupt();
+ heartbeatManager.close();
}
/** @return the network topology. */
public NetworkTopology getNetworkTopology() {
return networktopology;
}
-
+
+ /** @return the heartbeat manager. */
+ HeartbeatManager getHeartbeatManager() {
+ return heartbeatManager;
+ }
+
+ /** @return the datanode statistics. */
+ public DatanodeStatistics getDatanodeStatistics() {
+ return heartbeatManager;
+ }
+
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) {
@@ -231,9 +254,44 @@ public class DatanodeManager {
}
}
+ /**
+ * Remove a datanode descriptor.
+ * @param nodeInfo datanode descriptor.
+ */
+ private void removeDatanode(DatanodeDescriptor nodeInfo) {
+ assert namesystem.hasWriteLock();
+ heartbeatManager.removeDatanode(nodeInfo);
+ blockManager.removeBlocksAssociatedTo(nodeInfo);
+ networktopology.remove(nodeInfo);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove datanode " + nodeInfo.getName());
+ }
+ namesystem.checkSafeMode();
+ }
+
+ /**
+ * Remove a datanode
+ * @throws UnregisteredNodeException
+ */
+ public void removeDatanode(final DatanodeID node
+ ) throws UnregisteredNodeException {
+ namesystem.writeLock();
+ try {
+ final DatanodeDescriptor descriptor = getDatanode(node);
+ if (descriptor != null) {
+ removeDatanode(descriptor);
+ } else {
+ NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+ + node.getName() + " does not exist");
+ }
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+
/** Remove a dead datanode. */
- public void removeDeadDatanode(final DatanodeID nodeID) {
- synchronized(namesystem.heartbeats) {
+ void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(datanodeMap) {
DatanodeDescriptor d;
try {
@@ -244,14 +302,13 @@ public class DatanodeManager {
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
- namesystem.removeDatanode(d);
+ removeDatanode(d);
}
}
- }
}
/** Is the datanode dead? */
- public boolean isDatanodeDead(DatanodeDescriptor node) {
+ boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(Util.now() - heartbeatExpireInterval));
}
@@ -423,11 +480,48 @@ public class DatanodeManager {
throws IOException {
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg, ipAddr)) {
- namesystem.getBlockManager().startDecommission(nodeReg);
+ startDecommission(nodeReg);
+ }
+ }
+
+ /**
+ * Change, if appropriate, the admin state of a datanode to
+ * decommission completed. Return true if decommission is complete.
+ */
+ boolean checkDecommissionState(DatanodeDescriptor node) {
+ // Check to see if all blocks in this decommissioned
+ // node has reached their target replication factor.
+ if (node.isDecommissionInProgress()) {
+ if (!blockManager.isReplicationInProgress(node)) {
+ node.setDecommissioned();
+ LOG.info("Decommission complete for node " + node.getName());
+ }
+ }
+ return node.isDecommissioned();
+ }
+
+ /** Start decommissioning the specified datanode. */
+ private void startDecommission(DatanodeDescriptor node) throws IOException {
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+ LOG.info("Start Decommissioning node " + node.getName() + " with " +
+ node.numBlocks() + " blocks.");
+ heartbeatManager.startDecommission(node);
+ node.decommissioningStatus.setStartTime(now());
+
+ // all the blocks that reside on this node have to be replicated.
+ checkDecommissionState(node);
+ }
+ }
+
+ /** Stop decommissioning the specified datanodes. */
+ void stopDecommission(DatanodeDescriptor node) throws IOException {
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ LOG.info("Stop Decommissioning node " + node.getName());
+ heartbeatManager.stopDecommission(node);
+ blockManager.processOverReplicatedBlocksOnReCommission(node);
}
}
-
/**
* Generate new storage ID.
*
@@ -483,7 +577,7 @@ public class DatanodeManager {
+ "node from name: " + nodeN.getName());
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
- namesystem.removeDatanode(nodeN);
+ removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
@@ -525,14 +619,7 @@ public class DatanodeManager {
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
- synchronized(namesystem.heartbeats) {
- if( !namesystem.heartbeats.contains(nodeS)) {
- namesystem.heartbeats.add(nodeS);
- //update its timestamp
- nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
- nodeS.isAlive = true;
- }
- }
+ heartbeatManager.register(nodeS);
checkDecommissioning(nodeS, dnAddress);
return;
}
@@ -556,12 +643,9 @@ public class DatanodeManager {
checkDecommissioning(nodeDescr, dnAddress);
// also treat the registration message as a heartbeat
- synchronized(namesystem.heartbeats) {
- namesystem.heartbeats.add(nodeDescr);
- nodeDescr.isAlive = true;
- // no need to update its timestamp
- // because its is done when the descriptor is created
- }
+ // no need to update its timestamp
+ // because its is done when the descriptor is created
+ heartbeatManager.addDatanode(nodeDescr);
}
/** Reread include/exclude files. */
@@ -589,12 +673,12 @@ public class DatanodeManager {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node, null)) {
- node.setDisallowed(true); // case 2.
+ node.setDisallowed(true); // case 2.
} else {
if (inExcludedHostsList(node, null)) {
- namesystem.getBlockManager().startDecommission(node); // case 3.
+ startDecommission(node); // case 3.
} else {
- namesystem.getBlockManager().stopDecommission(node); // case 4.
+ stopDecommission(node); // case 4.
}
}
}
@@ -712,7 +796,7 @@ public class DatanodeManager {
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException {
- synchronized (namesystem.heartbeats) {
+ synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
@@ -731,10 +815,8 @@ public class DatanodeManager {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
- namesystem.updateStats(nodeinfo, false);
- nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
- xceiverCount, failedVolumes);
- namesystem.updateStats(nodeinfo, true);
+ heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
+ remaining, blockPoolUsed, xceiverCount, failedVolumes);
//check lease recovery
BlockInfoUnderConstruction[] blocks = nodeinfo
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1154042&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Thu Aug 4 22:55:48 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+
+/** Datanode statistics */
+public interface DatanodeStatistics {
+
+ /** @return the total capacity */
+ public long getCapacityTotal();
+
+ /** @return the used capacity */
+ public long getCapacityUsed();
+
+ /** @return the percentage of the used capacity over the total capacity. */
+ public float getCapacityUsedPercent();
+
+ /** @return the remaining capacity */
+ public long getCapacityRemaining();
+
+ /** @return the percentage of the remaining capacity over the total capacity. */
+ public float getCapacityRemainingPercent();
+
+ /** @return the block pool used. */
+ public long getBlockPoolUsed();
+
+ /** @return the percentage of the block pool used space over the total capacity. */
+ public float getPercentBlockPoolUsed();
+
+ /** @return the xceiver count */
+ public int getXceiverCount();
+
+ /**
+ * @return the total used space by data nodes for non-DFS purposes
+ * such as storing temporary files on the local file system
+ */
+ public long getCapacityUsedNonDFS();
+
+ /** The same as {@link ClientProtocol#getStats()}.
+ * The block related entries are set to -1.
+ */
+ public long[] getStats();
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Thu Aug 4 22:55:48 2011
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.util.CyclicIteration;
/**
* Manage node decommissioning.
@@ -35,11 +34,9 @@ class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
private final FSNamesystem fsnamesystem;
- private final BlockManager blockManager;
- DecommissionManager(FSNamesystem namesystem) {
+ DecommissionManager(final FSNamesystem namesystem) {
this.fsnamesystem = namesystem;
- this.blockManager = fsnamesystem.getBlockManager();
}
/** Periodically check decommission status. */
@@ -81,16 +78,16 @@ class DecommissionManager {
}
private void check() {
+ final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry
- : blockManager.getDatanodeManager().getDatanodeCyclicIteration(
- firstkey)) {
+ : dm.getDatanodeCyclicIteration(firstkey)) {
final DatanodeDescriptor d = entry.getValue();
firstkey = entry.getKey();
if (d.isDecommissionInProgress()) {
try {
- blockManager.checkDecommissionStateInternal(d);
+ dm.checkDecommissionState(d);
} catch(Exception e) {
LOG.warn("entry=" + entry, e);
}
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1154042&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Thu Aug 4 22:55:48 2011
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.util.Daemon;
+
+/**
+ * Manage the heartbeats received from datanodes.
+ * The datanode list and statistics are synchronized
+ * by the heartbeat manager lock.
+ */
+class HeartbeatManager implements DatanodeStatistics {
+ static final Log LOG = LogFactory.getLog(HeartbeatManager.class);
+
+ /**
+ * Stores a subset of the datanodeMap in DatanodeManager,
+ * containing nodes that are considered alive.
+ * The HeartbeatMonitor periodically checks for out-dated entries,
+ * and removes them from the list.
+ * It is synchronized by the heartbeat manager lock.
+ */
+ private final List<DatanodeDescriptor> datanodes = new ArrayList<DatanodeDescriptor>();
+
+ /** Statistics, which are synchronized by the heartbeat manager lock. */
+ private final Stats stats = new Stats();
+
+ /** The time period to check for expired datanodes */
+ private final long heartbeatRecheckInterval;
+ /** Heartbeat monitor thread */
+ private final Daemon heartbeatThread = new Daemon(new Monitor());
+
+ final FSNamesystem namesystem;
+
+ HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) {
+ this.heartbeatRecheckInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
+
+ this.namesystem = namesystem;
+ }
+
+ void activate(Configuration conf) {
+ heartbeatThread.start();
+ }
+
+ void close() {
+ heartbeatThread.interrupt();
+ }
+
+ synchronized int getLiveDatanodeCount() {
+ return datanodes.size();
+ }
+
+ @Override
+ public synchronized long getCapacityTotal() {
+ return stats.capacityTotal;
+ }
+
+ @Override
+ public synchronized long getCapacityUsed() {
+ return stats.capacityUsed;
+ }
+
+ @Override
+ public synchronized float getCapacityUsedPercent() {
+ return DFSUtil.getPercentUsed(stats.capacityUsed, stats.capacityTotal);
+ }
+
+ @Override
+ public synchronized long getCapacityRemaining() {
+ return stats.capacityRemaining;
+ }
+
+ @Override
+ public synchronized float getCapacityRemainingPercent() {
+ return DFSUtil.getPercentRemaining(
+ stats.capacityRemaining, stats.capacityTotal);
+ }
+
+ @Override
+ public synchronized long getBlockPoolUsed() {
+ return stats.blockPoolUsed;
+ }
+
+ @Override
+ public synchronized float getPercentBlockPoolUsed() {
+ return DFSUtil.getPercentUsed(stats.blockPoolUsed, stats.capacityTotal);
+ }
+
+ @Override
+ public synchronized long getCapacityUsedNonDFS() {
+ final long nonDFSUsed = stats.capacityTotal
+ - stats.capacityRemaining - stats.capacityUsed;
+ return nonDFSUsed < 0L? 0L : nonDFSUsed;
+ }
+
+ @Override
+ public synchronized int getXceiverCount() {
+ return stats.xceiverCount;
+ }
+
+ @Override
+ public synchronized long[] getStats() {
+ return new long[] {getCapacityTotal(),
+ getCapacityUsed(),
+ getCapacityRemaining(),
+ -1L,
+ -1L,
+ -1L,
+ getBlockPoolUsed()};
+ }
+
+ synchronized void register(final DatanodeDescriptor d) {
+ if (!datanodes.contains(d)) {
+ addDatanode(d);
+
+ //update its timestamp
+ d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+ }
+ }
+
+ synchronized DatanodeDescriptor[] getDatanodes() {
+ return datanodes.toArray(new DatanodeDescriptor[datanodes.size()]);
+ }
+
+ synchronized void addDatanode(final DatanodeDescriptor d) {
+ datanodes.add(d);
+ d.isAlive = true;
+ }
+
+ synchronized void removeDatanode(DatanodeDescriptor node) {
+ if (node.isAlive) {
+ stats.subtract(node);
+ datanodes.remove(node);
+ node.isAlive = false;
+ }
+ }
+
+ synchronized void updateHeartbeat(final DatanodeDescriptor node,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ int xceiverCount, int failedVolumes) {
+ stats.subtract(node);
+ node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
+ xceiverCount, failedVolumes);
+ stats.add(node);
+ }
+
+ synchronized void startDecommission(final DatanodeDescriptor node) {
+ stats.subtract(node);
+ node.startDecommission();
+ stats.add(node);
+ }
+
+ synchronized void stopDecommission(final DatanodeDescriptor node) {
+ stats.subtract(node);
+ node.stopDecommission();
+ stats.add(node);
+ }
+
+ /**
+ * Check if there are any expired heartbeats, and if so,
+ * whether any blocks have to be re-replicated.
+ * While removing dead datanodes, make sure that only one datanode is marked
+ * dead at a time within the synchronized section. Otherwise, a cascading
+ * effect causes more datanodes to be declared dead.
+ */
+ void heartbeatCheck() {
+ final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
+ // It's OK to check safe mode w/o taking the lock here, we re-check
+ // for safe mode after taking the lock before removing a datanode.
+ if (namesystem.isInSafeMode()) {
+ return;
+ }
+ boolean allAlive = false;
+ while (!allAlive) {
+ // locate the first dead node.
+ DatanodeID dead = null;
+ synchronized(this) {
+ for (DatanodeDescriptor d : datanodes) {
+ if (dm.isDatanodeDead(d)) {
+ namesystem.incrExpiredHeartbeats();
+ dead = d;
+ break;
+ }
+ }
+ }
+
+ allAlive = dead == null;
+ if (!allAlive) {
+ // acquire the fsnamesystem lock, and then remove the dead node.
+ namesystem.writeLock();
+ if (namesystem.isInSafeMode()) {
+ return;
+ }
+ try {
+ synchronized(this) {
+ dm.removeDeadDatanode(dead);
+ }
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+ }
+ }
+
+
+ /** Periodically check heartbeat and update block key */
+ private class Monitor implements Runnable {
+ private long lastHeartbeatCheck;
+ private long lastBlockKeyUpdate;
+
+ @Override
+ public void run() {
+ while(namesystem.isRunning()) {
+ try {
+ final long now = Util.now();
+ if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
+ heartbeatCheck();
+ lastHeartbeatCheck = now;
+ }
+ if (namesystem.getBlockManager().shouldUpdateBlockKey(
+ now - lastBlockKeyUpdate)) {
+ synchronized(HeartbeatManager.this) {
+ for(DatanodeDescriptor d : datanodes) {
+ d.needKeyUpdate = true;
+ }
+ }
+ lastBlockKeyUpdate = now;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while checking heartbeat", e);
+ }
+ try {
+ Thread.sleep(5000); // 5 seconds
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ /** Datanode statistics.
+ * For decommissioning/decommissioned nodes, only used capacity is counted.
+ */
+ private static class Stats {
+ private long capacityTotal = 0L;
+ private long capacityUsed = 0L;
+ private long capacityRemaining = 0L;
+ private long blockPoolUsed = 0L;
+ private int xceiverCount = 0;
+
+ private void add(final DatanodeDescriptor node) {
+ capacityUsed += node.getDfsUsed();
+ blockPoolUsed += node.getBlockPoolUsed();
+ xceiverCount += node.getXceiverCount();
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ capacityTotal += node.getCapacity();
+ capacityRemaining += node.getRemaining();
+ } else {
+ capacityTotal += node.getDfsUsed();
+ }
+ }
+
+ private void subtract(final DatanodeDescriptor node) {
+ capacityUsed -= node.getDfsUsed();
+ blockPoolUsed -= node.getBlockPoolUsed();
+ xceiverCount -= node.getXceiverCount();
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ capacityTotal -= node.getCapacity();
+ capacityRemaining -= node.getRemaining();
+ } else {
+ capacityTotal -= node.getDfsUsed();
+ }
+ }
+ }
+}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Aug 4 22:55:48 2011
@@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -97,6 +96,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -207,9 +207,6 @@ public class FSNamesystem implements FSC
private PermissionStatus defaultPermission;
// FSNamesystemMetrics counter variables
@Metric private MutableCounterInt expiredHeartbeats;
- private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
- private long blockPoolUsed = 0L;
- private int totalLoad = 0;
// Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@@ -221,24 +218,17 @@ public class FSNamesystem implements FSC
//
public FSDirectory dir;
private BlockManager blockManager;
-
+ private DatanodeStatistics datanodeStatistics;
+
// Block pool ID used by this namenode
String blockPoolId;
- /**
- * Stores a subset of datanodeMap, containing nodes that are considered alive.
- * The HeartbeatMonitor periodically checks for out-dated entries,
- * and removes them from the list.
- */
- public ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
-
public LeaseManager leaseManager = new LeaseManager(this);
//
// Threaded object that checks to see if we have been
// getting heartbeats from all clients.
//
- Daemon hbthread = null; // HeartbeatMonitor thread
public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
@@ -248,9 +238,6 @@ public class FSNamesystem implements FSC
private volatile boolean fsRunning = true;
long systemStart = 0;
- // heartbeatRecheckInterval is how often namenode checks for expired datanodes
- private long heartbeatRecheckInterval;
-
//resourceRecheckInterval is how often namenode checks for the disk space availability
private long resourceRecheckInterval;
@@ -303,6 +290,7 @@ public class FSNamesystem implements FSC
checkAvailableResources();
this.systemStart = now();
this.blockManager = new BlockManager(this, conf);
+ this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
@@ -333,10 +321,7 @@ public class FSNamesystem implements FSC
void activate(Configuration conf) throws IOException {
setBlockTotal();
blockManager.activate(conf);
- this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor());
-
- hbthread.start();
lmthread.start();
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
@@ -463,10 +448,6 @@ public class FSNamesystem implements FSC
DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
-
- this.heartbeatRecheckInterval = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.serverDefaults = new FsServerDefaults(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
@@ -512,7 +493,6 @@ public class FSNamesystem implements FSC
fsRunning = false;
try {
if (blockManager != null) blockManager.close();
- if (hbthread != null) hbthread.interrupt();
if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads();
if (nnrmthread != null) nnrmthread.interrupt();
@@ -622,7 +602,7 @@ public class FSNamesystem implements FSC
* Set permissions for an existing file.
* @throws IOException
*/
- public void setPermission(String src, FsPermission permission)
+ void setPermission(String src, FsPermission permission)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
HdfsFileStatus resultingStat = null;
@@ -651,7 +631,7 @@ public class FSNamesystem implements FSC
* Set owner for an existing file.
* @throws IOException
*/
- public void setOwner(String src, String username, String group)
+ void setOwner(String src, String username, String group)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
HdfsFileStatus resultingStat = null;
@@ -818,12 +798,6 @@ public class FSNamesystem implements FSC
lastBlock, last.isComplete());
}
}
-
- /** Create a LocatedBlock. */
- public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
- final long offset, final boolean corrupt) throws IOException {
- return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
- }
/**
@@ -1018,7 +992,7 @@ public class FSNamesystem implements FSC
/**
* Create a symbolic link.
*/
- public void createSymlink(String target, String link,
+ void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
HdfsFileStatus resultingStat = null;
@@ -1988,7 +1962,7 @@ public class FSNamesystem implements FSC
* @see ClientProtocol#delete(String, boolean) for detailed descriptoin and
* description of exceptions
*/
- public boolean delete(String src, boolean recursive)
+ boolean delete(String src, boolean recursive)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2118,7 +2092,7 @@ public class FSNamesystem implements FSC
/**
* Create all the necessary directories
*/
- public boolean mkdirs(String src, PermissionStatus permissions,
+ boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
boolean status = false;
if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2536,7 +2510,7 @@ public class FSNamesystem implements FSC
* @throws UnresolvedLinkException if symbolic link is encountered
* @throws IOException if other I/O error occurred
*/
- public DirectoryListing getListing(String src, byte[] startAfter,
+ DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation)
throws AccessControlException, UnresolvedLinkException, IOException {
DirectoryListing dl;
@@ -2606,7 +2580,7 @@ public class FSNamesystem implements FSC
* @see #registerDatanode(DatanodeRegistration)
* @return registration ID
*/
- public String getRegistrationID() {
+ String getRegistrationID() {
return Storage.getRegistrationID(dir.fsImage.getStorage());
}
@@ -2627,7 +2601,8 @@ public class FSNamesystem implements FSC
throws IOException {
readLock();
try {
- final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
+ final int maxTransfer = blockManager.getMaxReplicationStreams()
+ - xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
@@ -2655,35 +2630,6 @@ public class FSNamesystem implements FSC
}
}
- public void updateStats(DatanodeDescriptor node, boolean isAdded) {
- //
- // The statistics are protected by the heartbeat lock
- // For decommissioning/decommissioned nodes, only used capacity
- // is counted.
- //
- assert(Thread.holdsLock(heartbeats));
- if (isAdded) {
- capacityUsed += node.getDfsUsed();
- blockPoolUsed += node.getBlockPoolUsed();
- totalLoad += node.getXceiverCount();
- if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
- capacityTotal += node.getCapacity();
- capacityRemaining += node.getRemaining();
- } else {
- capacityTotal += node.getDfsUsed();
- }
- } else {
- capacityUsed -= node.getDfsUsed();
- blockPoolUsed -= node.getBlockPoolUsed();
- totalLoad -= node.getXceiverCount();
- if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
- capacityTotal -= node.getCapacity();
- capacityRemaining -= node.getRemaining();
- } else {
- capacityTotal -= node.getDfsUsed();
- }
- }
- }
/**
* Returns whether or not there were available resources at the last check of
@@ -2735,86 +2681,7 @@ public class FSNamesystem implements FSC
}
}
}
-
-
- /**
- * Periodically calls heartbeatCheck() and updateBlockKey()
- */
- class HeartbeatMonitor implements Runnable {
- private long lastHeartbeatCheck;
- private long lastBlockKeyUpdate;
- /**
- */
- public void run() {
- while (fsRunning) {
- try {
- long now = now();
- if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
- heartbeatCheck();
- lastHeartbeatCheck = now;
- }
- if (blockManager.isBlockTokenEnabled()
- && (lastBlockKeyUpdate + blockManager.getBlockKeyUpdateInterval() < now)) {
- blockManager.updateBlockKey();
- lastBlockKeyUpdate = now;
- }
- } catch (Exception e) {
- FSNamesystem.LOG.error("Exception while checking heartbeat", e);
- }
- try {
- Thread.sleep(5000); // 5 seconds
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
-
- public void setNodeReplicationLimit(int limit) {
- blockManager.maxReplicationStreams = limit;
- }
-
- /**
- * Remove a datanode descriptor.
- * @param nodeID datanode ID.
- * @throws UnregisteredNodeException
- */
- public void removeDatanode(final DatanodeID nodeID
- ) throws UnregisteredNodeException {
- writeLock();
- try {
- DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
- ).getDatanode(nodeID);
- if (nodeInfo != null) {
- removeDatanode(nodeInfo);
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
- + nodeID.getName() + " does not exist");
- }
- } finally {
- writeUnlock();
- }
- }
- /**
- * Remove a datanode descriptor.
- * @param nodeInfo datanode descriptor.
- */
- public void removeDatanode(DatanodeDescriptor nodeInfo) {
- assert hasWriteLock();
- synchronized (heartbeats) {
- if (nodeInfo.isAlive) {
- updateStats(nodeInfo, false);
- heartbeats.remove(nodeInfo);
- nodeInfo.isAlive = false;
- }
- }
-
- blockManager.removeDatanode(nodeInfo);
-
- checkSafeMode();
- }
-
FSImage getFSImage() {
return dir.fsImage;
}
@@ -2822,61 +2689,12 @@ public class FSNamesystem implements FSC
FSEditLog getEditLog() {
return getFSImage().getEditLog();
}
-
- /**
- * Check if there are any expired heartbeats, and if so,
- * whether any blocks have to be re-replicated.
- * While removing dead datanodes, make sure that only one datanode is marked
- * dead at a time within the synchronized section. Otherwise, a cascading
- * effect causes more datanodes to be declared dead.
- */
- void heartbeatCheck() {
- final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
- // It's OK to check safe mode w/o taking the lock here, we re-check
- // for safe mode after taking the lock before removing a datanode.
- if (isInSafeMode()) {
- return;
- }
- boolean allAlive = false;
- while (!allAlive) {
- boolean foundDead = false;
- DatanodeID nodeID = null;
-
- // locate the first dead node.
- synchronized(heartbeats) {
- for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
- it.hasNext();) {
- DatanodeDescriptor nodeInfo = it.next();
- if (datanodeManager.isDatanodeDead(nodeInfo)) {
- expiredHeartbeats.incr();
- foundDead = true;
- nodeID = nodeInfo;
- break;
- }
- }
- }
-
- // acquire the fsnamesystem lock, and then remove the dead node.
- if (foundDead) {
- writeLock();
- if (isInSafeMode()) {
- return;
- }
- try {
- datanodeManager.removeDeadDatanode(nodeID);
- } finally {
- writeUnlock();
- }
- }
- allAlive = !foundDead;
- }
- }
/**
* The given node is reporting all its blocks. Use this info to
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
- public void processReport(DatanodeID nodeID, String poolId,
+ void processReport(DatanodeID nodeID, String poolId,
BlockListAsLongs newReport) throws IOException {
long startTime, endTime;
@@ -3057,15 +2875,18 @@ public class FSNamesystem implements FSC
return blockManager.getMissingBlocksCount();
}
+ /** Increment expired heartbeat counter. */
+ public void incrExpiredHeartbeats() {
+ expiredHeartbeats.incr();
+ }
+
+ /** @see ClientProtocol#getStats() */
long[] getStats() {
- synchronized(heartbeats) {
- return new long[] {this.capacityTotal, this.capacityUsed,
- this.capacityRemaining,
- getUnderReplicatedBlocks(),
- getCorruptReplicaBlocks(),
- getMissingBlocksCount(),
- getBlockPoolUsedSpace()};
- }
+ final long[] stats = datanodeStatistics.getStats();
+ stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
+ stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
+ stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
+ return stats;
}
/**
@@ -3073,9 +2894,7 @@ public class FSNamesystem implements FSC
*/
@Override // FSNamesystemMBean
public long getCapacityTotal() {
- synchronized(heartbeats) {
- return capacityTotal;
- }
+ return datanodeStatistics.getCapacityTotal();
}
@Metric
@@ -3088,9 +2907,7 @@ public class FSNamesystem implements FSC
*/
@Override // FSNamesystemMBean
public long getCapacityUsed() {
- synchronized(heartbeats) {
- return capacityUsed;
- }
+ return datanodeStatistics.getCapacityUsed();
}
@Metric
@@ -3098,32 +2915,9 @@ public class FSNamesystem implements FSC
return DFSUtil.roundBytesToGB(getCapacityUsed());
}
- /**
- * Total used space by data nodes as percentage of total capacity
- */
- public float getCapacityUsedPercent() {
- synchronized(heartbeats){
- return DFSUtil.getPercentUsed(capacityUsed, capacityTotal);
- }
- }
- /**
- * Total used space by data nodes for non DFS purposes such
- * as storing temporary files on the local file system
- */
- public long getCapacityUsedNonDFS() {
- long nonDFSUsed = 0;
- synchronized(heartbeats){
- nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
- }
- return nonDFSUsed < 0 ? 0 : nonDFSUsed;
- }
- /**
- * Total non-used raw bytes.
- */
+ @Override
public long getCapacityRemaining() {
- synchronized(heartbeats) {
- return capacityRemaining;
- }
+ return datanodeStatistics.getCapacityRemaining();
}
@Metric
@@ -3132,22 +2926,12 @@ public class FSNamesystem implements FSC
}
/**
- * Total remaining space by data nodes as percentage of total capacity
- */
- public float getCapacityRemainingPercent() {
- synchronized(heartbeats){
- return DFSUtil.getPercentRemaining(capacityRemaining, capacityTotal);
- }
- }
- /**
* Total number of connections.
*/
@Override // FSNamesystemMBean
@Metric
public int getTotalLoad() {
- synchronized (heartbeats) {
- return this.totalLoad;
- }
+ return datanodeStatistics.getXceiverCount();
}
int getNumberOfDatanodes(DatanodeReportType type) {
@@ -3757,8 +3541,9 @@ public class FSNamesystem implements FSC
}
return isInSafeMode();
}
-
- private void checkSafeMode() {
+
+ /** Check and trigger safe mode. */
+ public void checkSafeMode() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode != null) {
@@ -4131,11 +3916,6 @@ public class FSNamesystem implements FSC
return blockManager.getUnderReplicatedBlocksCount();
}
- /** Return number of under-replicated but not missing blocks */
- public long getUnderReplicatedNotMissingBlocks() {
- return blockManager.getUnderReplicatedNotMissingBlocks();
- }
-
/** Returns number of blocks with corrupt replicas */
@Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
public long getCorruptReplicaBlocks() {
@@ -4207,14 +3987,14 @@ public class FSNamesystem implements FSC
/**
* Sets the generation stamp for this filesystem
*/
- public void setGenerationStamp(long stamp) {
+ void setGenerationStamp(long stamp) {
generationStamp.setStamp(stamp);
}
/**
* Gets the generation stamp for this filesystem
*/
- public long getGenerationStamp() {
+ long getGenerationStamp() {
return generationStamp.getStamp();
}
@@ -4854,31 +4634,27 @@ public class FSNamesystem implements FSC
@Override // NameNodeMXBean
public long getNonDfsUsedSpace() {
- return getCapacityUsedNonDFS();
+ return datanodeStatistics.getCapacityUsedNonDFS();
}
@Override // NameNodeMXBean
public float getPercentUsed() {
- return getCapacityUsedPercent();
+ return datanodeStatistics.getCapacityUsedPercent();
}
@Override // NameNodeMXBean
public long getBlockPoolUsedSpace() {
- synchronized(heartbeats) {
- return blockPoolUsed;
- }
+ return datanodeStatistics.getBlockPoolUsed();
}
@Override // NameNodeMXBean
public float getPercentBlockPoolUsed() {
- synchronized(heartbeats) {
- return DFSUtil.getPercentUsed(blockPoolUsed, capacityTotal);
- }
+ return datanodeStatistics.getPercentBlockPoolUsed();
}
@Override // NameNodeMXBean
public float getPercentRemaining() {
- return getCapacityRemainingPercent();
+ return datanodeStatistics.getCapacityRemainingPercent();
}
@Override // NameNodeMXBean
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Aug 4 22:55:48 2011
@@ -1229,7 +1229,7 @@ public class NameNode implements Namenod
LOG.warn("Disk error on " + dnName + ": " + msg);
} else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
LOG.warn("Fatal disk error on " + dnName + ": " + msg);
- namesystem.removeDatanode(nodeReg);
+ namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);
} else {
LOG.info("Error report from " + dnName + ": " + msg);
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Thu Aug 4 22:55:48 2011
@@ -349,7 +349,7 @@ class NamenodeJspHelper {
+ colTxt() + ":" + colTxt() + decommissioning.size()
+ rowTxt() + colTxt("Excludes missing blocks.")
+ "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
- + fsn.getUnderReplicatedNotMissingBlocks()
+ + fsn.getBlockManager().getUnderReplicatedNotMissingBlocks()
+ "</table></div><br>\n");
if (live.isEmpty() && dead.isEmpty()) {
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Thu Aug 4 22:55:48 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.ChecksumExce
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
/**
* The test makes sure that NameNode detects presense blocks that do not have
@@ -56,6 +57,7 @@ public class TestMissingBlocksAlert exte
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
DistributedFileSystem dfs =
(DistributedFileSystem) cluster.getFileSystem();
@@ -86,8 +88,7 @@ public class TestMissingBlocksAlert exte
}
assertTrue(dfs.getMissingBlocksCount() == 1);
assertEquals(4, dfs.getUnderReplicatedBlocksCount());
- assertEquals(3,
- cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
+ assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());
// Now verify that it shows up on webui
@@ -109,8 +110,7 @@ public class TestMissingBlocksAlert exte
}
assertEquals(2, dfs.getUnderReplicatedBlocksCount());
- assertEquals(2,
- cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
+ assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());
// and make sure WARNING disappears
// Now verify that it shows up on webui
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Thu Aug 4 22:55:48 2011
@@ -25,10 +25,13 @@ import java.util.Set;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon;
public class BlockManagerTestUtil {
+ public static void setNodeReplicationLimit(final BlockManager blockManager,
+ final int limit) {
+ blockManager.maxReplicationStreams = limit;
+ }
/** @return the datanode descriptor for the given the given storageID. */
public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java (from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java&r1=1154040&r2=1154042&rev=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java Thu Aug 4 22:55:48 2011
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
@@ -31,18 +37,14 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.log4j.Level;
-
-import static org.junit.Assert.*;
import org.junit.Test;
public class TestBlocksWithNotEnoughRacks {
public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
static {
- ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
}
@@ -278,6 +280,7 @@ public class TestBlocksWithNotEnoughRack
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+ final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block with a replication factor of 2
@@ -293,7 +296,7 @@ public class TestBlocksWithNotEnoughRack
DataNode dataNode = datanodes.get(idx);
DatanodeID dnId = dataNode.getDatanodeId();
cluster.stopDataNode(idx);
- ns.removeDatanode(dnId);
+ dm.removeDatanode(dnId);
// The block should still have sufficient # replicas, across racks.
// The last node may not have contained a replica, but if it did
@@ -307,7 +310,7 @@ public class TestBlocksWithNotEnoughRack
dataNode = datanodes.get(idx);
dnId = dataNode.getDatanodeId();
cluster.stopDataNode(idx);
- ns.removeDatanode(dnId);
+ dm.removeDatanode(dnId);
// Make sure we have enough live replicas even though we are
// short one rack and therefore need one replica
@@ -332,6 +335,7 @@ public class TestBlocksWithNotEnoughRack
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+ final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block
@@ -347,7 +351,7 @@ public class TestBlocksWithNotEnoughRack
DataNode dataNode = datanodes.get(2);
DatanodeID dnId = dataNode.getDatanodeId();
cluster.stopDataNode(2);
- ns.removeDatanode(dnId);
+ dm.removeDatanode(dnId);
// The block gets re-replicated to another datanode so it has a
// sufficient # replicas, but not across racks, so there should
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java Thu Aug 4 22:55:48 2011
@@ -45,8 +45,8 @@ public class TestComputeInvalidateWork e
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
- DatanodeDescriptor[] nodes =
- namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
+ final DatanodeDescriptor[] nodes = bm.getDatanodeManager(
+ ).getHeartbeatManager().getDatanodes();
assertEquals(nodes.length, NUM_OF_DATANODES);
namesystem.writeLock();
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Thu Aug 4 22:55:48 2011
@@ -52,6 +52,8 @@ public class TestHeartbeatHandling exten
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
+ final HeartbeatManager hm = namesystem.getBlockManager(
+ ).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
@@ -69,7 +71,7 @@ public class TestHeartbeatHandling exten
try {
namesystem.writeLock();
- synchronized (namesystem.heartbeats) {
+ synchronized(hm) {
for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
dd.addBlockToBeReplicated(
new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java (from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java&r1=1154040&r2=1154042&rev=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java Thu Aug 4 22:55:48 2011
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collection;
import java.util.Iterator;
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
/**
* Test if live nodes count per node is correct
@@ -57,6 +59,8 @@ public class TestNodeCount extends TestC
new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).build();
try {
final FSNamesystem namesystem = cluster.getNamesystem();
+ final BlockManager bm = namesystem.getBlockManager();
+ final HeartbeatManager hm = bm.getDatanodeManager().getHeartbeatManager();
final FileSystem fs = cluster.getFileSystem();
// populate the cluster with a one block file
@@ -66,8 +70,7 @@ public class TestNodeCount extends TestC
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
// keep a copy of all datanode descriptor
- DatanodeDescriptor[] datanodes =
- namesystem.heartbeats.toArray(new DatanodeDescriptor[REPLICATION_FACTOR]);
+ final DatanodeDescriptor[] datanodes = hm.getDatanodes();
// start two new nodes
cluster.startDataNodes(conf, 2, true, null, null);
@@ -80,9 +83,9 @@ public class TestNodeCount extends TestC
// make sure that NN detects that the datanode is down
try {
namesystem.writeLock();
- synchronized (namesystem.heartbeats) {
+ synchronized (hm) {
datanode.setLastUpdate(0); // mark it dead
- namesystem.heartbeatCheck();
+ hm.heartbeatCheck();
}
} finally {
namesystem.writeUnlock();
@@ -102,12 +105,12 @@ public class TestNodeCount extends TestC
}
// find out a non-excess node
- Iterator<DatanodeDescriptor> iter = namesystem.getBlockManager().blocksMap
+ final Iterator<DatanodeDescriptor> iter = bm.blocksMap
.nodeIterator(block.getLocalBlock());
DatanodeDescriptor nonExcessDN = null;
while (iter.hasNext()) {
DatanodeDescriptor dn = iter.next();
- Collection<Block> blocks = namesystem.getBlockManager().excessReplicateMap.get(dn.getStorageID());
+ Collection<Block> blocks = bm.excessReplicateMap.get(dn.getStorageID());
if (blocks == null || !blocks.contains(block) ) {
nonExcessDN = dn;
break;
@@ -121,9 +124,9 @@ public class TestNodeCount extends TestC
try {
namesystem.writeLock();
- synchronized (namesystem.heartbeats) {
+ synchronized(hm) {
nonExcessDN.setLastUpdate(0); // mark it dead
- namesystem.heartbeatCheck();
+ hm.heartbeatCheck();
}
} finally {
namesystem.writeUnlock();
Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java (from r1154040, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java&r1=1154040&r2=1154042&rev=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java Thu Aug 4 22:55:48 2011
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.File;
import java.io.IOException;
@@ -34,7 +34,9 @@ import org.apache.hadoop.hdfs.TestDatano
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
public class TestOverReplicatedBlocks extends TestCase {
/** Test processOverReplicatedBlock can handle corrupt replicas fine.
@@ -83,13 +85,15 @@ public class TestOverReplicatedBlocks ex
cluster.getDataNodes().get(2), blockPoolId);
final FSNamesystem namesystem = cluster.getNamesystem();
+ final BlockManager bm = namesystem.getBlockManager();
+ final HeartbeatManager hm = bm.getDatanodeManager().getHeartbeatManager();
try {
namesystem.writeLock();
- synchronized (namesystem.heartbeats) {
+ synchronized(hm) {
// set live datanode's remaining space to be 0
// so they will be chosen to be deleted when over-replication occurs
String corruptMachineName = corruptDataNode.getName();
- for (DatanodeDescriptor datanode : namesystem.heartbeats) {
+ for (DatanodeDescriptor datanode : hm.getDatanodes()) {
if (!corruptMachineName.equals(datanode.getName())) {
datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0);
}
@@ -100,8 +104,7 @@ public class TestOverReplicatedBlocks ex
// corrupt one won't be chosen to be excess one
// without 4910 the number of live replicas would be 0: block gets lost
- assertEquals(1, namesystem.getBlockManager().countNodes(block.getLocalBlock())
- .liveReplicas());
+ assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas());
}
} finally {
namesystem.writeUnlock();
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Thu Aug 4 22:55:48 2011
@@ -1128,7 +1128,8 @@ public class NNThroughputBenchmark {
// decommission data-nodes
decommissionNodes();
// set node replication limit
- namesystem.setNodeReplicationLimit(nodeReplicationLimit);
+ BlockManagerTestUtil.setNodeReplicationLimit(namesystem.getBlockManager(),
+ nodeReplicationLimit);
}
private void decommissionNodes() throws IOException {
@@ -1171,9 +1172,7 @@ public class NNThroughputBenchmark {
void printResults() {
String blockDistribution = "";
String delim = "(";
- int totalReplicas = 0;
for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
- totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
delim = ", ";
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java?rev=1154042&r1=1154041&r2=1154042&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java Thu Aug 4 22:55:48 2011
@@ -112,10 +112,10 @@ public class TestNamenodeCapacityReport
configCapacity = namesystem.getCapacityTotal();
used = namesystem.getCapacityUsed();
- nonDFSUsed = namesystem.getCapacityUsedNonDFS();
+ nonDFSUsed = namesystem.getNonDfsUsedSpace();
remaining = namesystem.getCapacityRemaining();
- percentUsed = namesystem.getCapacityUsedPercent();
- percentRemaining = namesystem.getCapacityRemainingPercent();
+ percentUsed = namesystem.getPercentUsed();
+ percentRemaining = namesystem.getPercentRemaining();
bpUsed = namesystem.getBlockPoolUsedSpace();
percentBpUsed = namesystem.getPercentBlockPoolUsed();