You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/07/27 07:46:55 UTC
svn commit: r1151339 - in /hadoop/common/trunk/hdfs: ./
src/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/blockmana...
Author: szetszwo
Date: Wed Jul 27 05:46:52 2011
New Revision: 1151339
URL: http://svn.apache.org/viewvc?rev=1151339&view=rev
Log:
HDFS-2191. Move datanodeMap from FSNamesystem to DatanodeManager.
Added:
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
- copied, changed from r1151310, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Removed:
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Wed Jul 27 05:46:52 2011
@@ -604,6 +604,9 @@ Trunk (unreleased changes)
HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
implementations. (Ivan Kelly via todd)
+ HDFS-2191. Move datanodeMap from FSNamesystem to DatanodeManager.
+ (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jul 27 05:46:52 2011
@@ -42,9 +42,11 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -53,6 +55,8 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
@@ -156,7 +160,7 @@ public class BlockManager {
public final int defaultReplication;
/** The maximum number of entries returned by getCorruptInodes() */
final int maxCorruptFilesReturned;
-
+
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
@@ -294,15 +298,14 @@ public class BlockManager {
}
}
- //
// Dump blocks from pendingReplication
- //
pendingReplications.metaSave(out);
- //
// Dump blocks that are waiting to be deleted
- //
dumpRecentInvalidateSets(out);
+
+ // Dump all datanodes
+ getDatanodeManager().datanodeDump(out);
}
/**
@@ -453,7 +456,7 @@ public class BlockManager {
/**
* Get all valid locations of the block
*/
- public ArrayList<String> getValidLocations(Block block) {
+ private List<String> getValidLocations(Block block) {
ArrayList<String> machineSet =
new ArrayList<String>(blocksMap.numNodes(block));
for(Iterator<DatanodeDescriptor> it =
@@ -562,6 +565,49 @@ public class BlockManager {
minReplication);
}
+ /** Get all blocks with location information from a datanode. */
+ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
+ final long size) throws UnregisteredNodeException {
+ final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
+ if (node == null) {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+ + "Asking for blocks from an unrecorded node " + datanode.getName());
+ throw new HadoopIllegalArgumentException(
+ "Datanode " + datanode.getName() + " not found.");
+ }
+
+ int numBlocks = node.numBlocks();
+ if(numBlocks == 0) {
+ return new BlocksWithLocations(new BlockWithLocations[0]);
+ }
+ Iterator<BlockInfo> iter = node.getBlockIterator();
+ int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
+ // skip blocks
+ for(int i=0; i<startBlock; i++) {
+ iter.next();
+ }
+ List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+ long totalSize = 0;
+ BlockInfo curBlock;
+ while(totalSize<size && iter.hasNext()) {
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
+ }
+ if(totalSize<size) {
+ iter = node.getBlockIterator(); // start from the beginning
+ for(int i=0; i<startBlock&&totalSize<size; i++) {
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
+ }
+ }
+
+ return new BlocksWithLocations(
+ results.toArray(new BlockWithLocations[results.size()]));
+ }
+
+
/** Remove a datanode. */
public void removeDatanode(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
@@ -660,7 +706,7 @@ public class BlockManager {
for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
Collection<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
- out.println(namesystem.getDatanode(entry.getKey()).getName() + blocks);
+ out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
}
}
}
@@ -684,7 +730,7 @@ public class BlockManager {
private void markBlockAsCorrupt(BlockInfo storedBlock,
DatanodeInfo dn) throws IOException {
assert storedBlock != null : "storedBlock should not be null";
- DatanodeDescriptor node = namesystem.getDatanode(dn);
+ DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot mark block " +
storedBlock.getBlockName() +
@@ -723,7 +769,7 @@ public class BlockManager {
throws IOException {
NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
+ blk + " on " + dn.getName());
- DatanodeDescriptor node = namesystem.getDatanode(dn);
+ DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot invalidate block " + blk +
" because datanode " + dn.getName() +
@@ -748,7 +794,7 @@ public class BlockManager {
}
}
- public void updateState() {
+ void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
underReplicatedBlocksCount = neededReplications.size();
corruptReplicaBlocksCount = corruptReplicas.size();
@@ -1134,7 +1180,7 @@ public class BlockManager {
* If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue
*/
- public void processPendingReplications() {
+ private void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) {
namesystem.writeLock();
@@ -1700,6 +1746,7 @@ public class BlockManager {
addedNode, delNodeHint, blockplacement);
}
+
public void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
@@ -1774,6 +1821,21 @@ public class BlockManager {
}
/**
+ * Get all valid locations of the block & add the block to results
+ * return the length of the added block; 0 if the block is not added
+ */
+ private long addBlock(Block block, List<BlockWithLocations> results) {
+ final List<String> machineSet = getValidLocations(block);
+ if(machineSet.size() == 0) {
+ return 0;
+ } else {
+ results.add(new BlockWithLocations(block,
+ machineSet.toArray(new String[machineSet.size()])));
+ return block.getNumBytes();
+ }
+ }
+
+ /**
* The given node is reporting that it received a certain block.
*/
public void addBlock(DatanodeDescriptor node, Block block, String delHint)
@@ -1784,7 +1846,7 @@ public class BlockManager {
// get the deletion hint node
DatanodeDescriptor delHintNode = null;
if (delHint != null && delHint.length() != 0) {
- delHintNode = namesystem.getDatanode(delHint);
+ delHintNode = datanodeManager.getDatanode(delHint);
if (delHintNode == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
+ block + " is expected to be removed from an unrecorded node "
@@ -2071,7 +2133,7 @@ public class BlockManager {
return 0;
// get blocks to invalidate for the nodeId
assert nodeId != null;
- DatanodeDescriptor dn = namesystem.getDatanode(nodeId);
+ final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
if (dn == null) {
removeFromInvalidates(nodeId);
return 0;
@@ -2082,11 +2144,11 @@ public class BlockManager {
return 0;
ArrayList<Block> blocksToInvalidate = new ArrayList<Block>(
- namesystem.blockInvalidateLimit);
+ getDatanodeManager().blockInvalidateLimit);
// # blocks that can be sent in one message is limited
Iterator<Block> it = invalidateSet.iterator();
- for (int blkCount = 0; blkCount < namesystem.blockInvalidateLimit
+ for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
&& it.hasNext(); blkCount++) {
blocksToInvalidate.add(it.next());
it.remove();
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Jul 27 05:46:52 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
+import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,12 +38,22 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.ipc.Server;
@@ -48,6 +61,7 @@ import org.apache.hadoop.net.CachedDNSTo
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.CyclicIteration;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
@@ -62,6 +76,30 @@ public class DatanodeManager {
final FSNamesystem namesystem;
+ /**
+ * Stores the datanode -> block map.
+ * <p>
+ * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
+ * storage id. In order to keep the storage map consistent it tracks
+ * all storages ever registered with the namenode.
+ * A descriptor corresponding to a specific storage id can be
+ * <ul>
+ * <li>added to the map if it is a new storage id;</li>
+ * <li>updated with a new datanode started as a replacement for the old one
+ * with the same storage id; and </li>
+ * <li>removed if and only if an existing datanode is restarted to serve a
+ * different storage id.</li>
+ * </ul> <br>
+ * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+ * in the namespace image file. Only the {@link DatanodeInfo} part is
+ * persistent, the list of blocks is restored from the datanode block
+ * reports.
+ * <p>
+ * Mapping: StorageID -> DatanodeDescriptor
+ */
+ private final NavigableMap<String, DatanodeDescriptor> datanodeMap
+ = new TreeMap<String, DatanodeDescriptor>();
+
/** Cluster network topology */
private final NetworkTopology networktopology = new NetworkTopology();
@@ -71,7 +109,12 @@ public class DatanodeManager {
private final DNSToSwitchMapping dnsToSwitchMapping;
/** Read include/exclude files*/
- private final HostsFileReader hostsReader;
+ private final HostsFileReader hostsReader;
+
+ /** The period to wait for datanode heartbeat.*/
+ private final long heartbeatExpireInterval;
+ /** Ask Datanode only up to this many blocks to delete. */
+ final int blockInvalidateLimit;
DatanodeManager(final FSNamesystem namesystem, final Configuration conf
) throws IOException {
@@ -90,6 +133,19 @@ public class DatanodeManager {
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
+
+ final long heartbeatIntervalSeconds = conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ final int heartbeatRecheckInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
+ this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+ + 10 * 1000 * heartbeatIntervalSeconds;
+ this.blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
+ DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
+ LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ + "=" + this.blockInvalidateLimit);
}
private Daemon decommissionthread = null;
@@ -124,20 +180,88 @@ public class DatanodeManager {
Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
}
}
-
+
+ CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
+ final String firstkey) {
+ return new CyclicIteration<String, DatanodeDescriptor>(
+ datanodeMap, firstkey);
+ }
+
/** @return the datanode descriptor for the host. */
public DatanodeDescriptor getDatanodeByHost(final String host) {
return host2DatanodeMap.getDatanodeByHost(host);
}
+ /** Get a datanode descriptor given corresponding storageID */
+ DatanodeDescriptor getDatanode(final String storageID) {
+ return datanodeMap.get(storageID);
+ }
+
+ /**
+ * Get data node by storage ID.
+ *
+ * @param nodeID
+ * @return DatanodeDescriptor or null if the node is not found.
+ * @throws UnregisteredNodeException
+ */
+ public DatanodeDescriptor getDatanode(DatanodeID nodeID
+ ) throws UnregisteredNodeException {
+ final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
+ if (node == null)
+ return null;
+ if (!node.getName().equals(nodeID.getName())) {
+ final UnregisteredNodeException e = new UnregisteredNodeException(
+ nodeID, node);
+ NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
+ + e.getLocalizedMessage());
+ throw e;
+ }
+ return node;
+ }
+
+ /** Prints information about all datanodes. */
+ void datanodeDump(final PrintWriter out) {
+ synchronized (datanodeMap) {
+ out.println("Metasave: Number of datanodes: " + datanodeMap.size());
+ for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ out.println(node.dumpDatanode());
+ }
+ }
+ }
+
+ /** Remove a dead datanode. */
+ public void removeDeadDatanode(final DatanodeID nodeID) {
+ synchronized(namesystem.heartbeats) {
+ synchronized(datanodeMap) {
+ DatanodeDescriptor d;
+ try {
+ d = getDatanode(nodeID);
+ } catch(IOException e) {
+ d = null;
+ }
+ if (d != null && isDatanodeDead(d)) {
+ NameNode.stateChangeLog.info(
+ "BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
+ namesystem.removeDatanode(d);
+ }
+ }
+ }
+ }
+
+ /** Is the datanode dead? */
+ public boolean isDatanodeDead(DatanodeDescriptor node) {
+ return (node.getLastUpdate() <
+ (Util.now() - heartbeatExpireInterval));
+ }
+
/** Add a datanode. */
private void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
- synchronized (namesystem.datanodeMap) {
- host2DatanodeMap.remove(
- namesystem.datanodeMap.put(node.getStorageID(), node));
+ synchronized(datanodeMap) {
+ host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
}
host2DatanodeMap.add(node);
@@ -152,8 +276,8 @@ public class DatanodeManager {
/** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) throws IOException {
final String key = node.getStorageID();
- synchronized (namesystem.datanodeMap) {
- host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
+ synchronized (datanodeMap) {
+ host2DatanodeMap.remove(datanodeMap.remove(key));
}
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
@@ -315,7 +439,7 @@ public class DatanodeManager {
String newID = null;
while(newID == null) {
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
- if (namesystem.datanodeMap.get(newID) != null)
+ if (datanodeMap.get(newID) != null)
newID = null;
}
return newID;
@@ -350,7 +474,7 @@ public class DatanodeManager {
+ "node registration from " + nodeReg.getName()
+ " storage " + nodeReg.getStorageID());
- DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
+ DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
if (nodeN != null && nodeN != nodeS) {
@@ -461,7 +585,7 @@ public class DatanodeManager {
* 4. Removed from exclude --> stop decommission.
*/
public void refreshDatanodes() throws IOException {
- for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
+ for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node, null)) {
node.setDisallowed(true); // case 2.
@@ -475,6 +599,45 @@ public class DatanodeManager {
}
}
+ /** @return the number of live datanodes. */
+ public int getNumLiveDataNodes() {
+ int numLive = 0;
+ synchronized (datanodeMap) {
+ for(DatanodeDescriptor dn : datanodeMap.values()) {
+ if (!isDatanodeDead(dn) ) {
+ numLive++;
+ }
+ }
+ }
+ return numLive;
+ }
+
+ /** @return the number of dead datanodes. */
+ public int getNumDeadDataNodes() {
+ int numDead = 0;
+ synchronized (datanodeMap) {
+ for(DatanodeDescriptor dn : datanodeMap.values()) {
+ if (isDatanodeDead(dn) ) {
+ numDead++;
+ }
+ }
+ }
+ return numDead;
+ }
+
+ /** Fetch live and dead datanodes. */
+ public void fetchDatanodess(final List<DatanodeDescriptor> live,
+ final List<DatanodeDescriptor> dead) {
+ final List<DatanodeDescriptor> results =
+ getDatanodeListForReport(DatanodeReportType.ALL);
+ for(DatanodeDescriptor node : results) {
+ if (isDatanodeDead(node))
+ dead.add(node);
+ else
+ live.add(node);
+ }
+ }
+
/** For generating datanode reports */
public List<DatanodeDescriptor> getDatanodeListForReport(
final DatanodeReportType type) {
@@ -499,13 +662,13 @@ public class DatanodeManager {
ArrayList<DatanodeDescriptor> nodes = null;
- synchronized (namesystem.datanodeMap) {
- nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() +
+ synchronized(datanodeMap) {
+ nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
mustList.size());
- Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
+ Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn = it.next();
- boolean isDead = namesystem.isDatanodeDead(dn);
+ final boolean isDead = isDatanodeDead(dn);
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
nodes.add(dn);
}
@@ -537,4 +700,77 @@ public class DatanodeManager {
}
return nodes;
}
+
+ private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+ node.setLastUpdate(0);
+ }
+
+ /** Handle heartbeat from datanodes. */
+ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+ final String blockPoolId,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ int xceiverCount, int maxTransfers, int failedVolumes
+ ) throws IOException {
+ synchronized (namesystem.heartbeats) {
+ synchronized (datanodeMap) {
+ DatanodeDescriptor nodeinfo = null;
+ try {
+ nodeinfo = getDatanode(nodeReg);
+ } catch(UnregisteredNodeException e) {
+ return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+ }
+
+ // Check if this datanode should actually be shutdown instead.
+ if (nodeinfo != null && nodeinfo.isDisallowed()) {
+ setDatanodeDead(nodeinfo);
+ throw new DisallowedDatanodeException(nodeinfo);
+ }
+
+ if (nodeinfo == null || !nodeinfo.isAlive) {
+ return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+ }
+
+ namesystem.updateStats(nodeinfo, false);
+ nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
+ xceiverCount, failedVolumes);
+ namesystem.updateStats(nodeinfo, true);
+
+ //check lease recovery
+ BlockInfoUnderConstruction[] blocks = nodeinfo
+ .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+ if (blocks != null) {
+ BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
+ blocks.length);
+ for (BlockInfoUnderConstruction b : blocks) {
+ brCommand.add(new RecoveringBlock(
+ new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
+ .getBlockRecoveryId()));
+ }
+ return new DatanodeCommand[] { brCommand };
+ }
+
+ final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
+ //check pending replication
+ List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+ maxTransfers);
+ if (pendingList != null) {
+ cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+ pendingList));
+ }
+ //check block invalidation
+ Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ if (blks != null) {
+ cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
+ blockPoolId, blks));
+ }
+
+ namesystem.addKeyUpdateCommand(cmds, nodeinfo);
+ if (!cmds.isEmpty()) {
+ return cmds.toArray(new DatanodeCommand[cmds.size()]);
+ }
+ }
+ }
+
+ return null;
+ }
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Wed Jul 27 05:46:52 2011
@@ -83,8 +83,8 @@ class DecommissionManager {
private void check() {
int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry
- : new CyclicIteration<String, DatanodeDescriptor>(
- fsnamesystem.datanodeMap, firstkey)) {
+ : blockManager.getDatanodeManager().getDatanodeCyclicIteration(
+ firstkey)) {
final DatanodeDescriptor d = entry.getValue();
firstkey = entry.getKey();
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Jul 27 05:46:52 2011
@@ -237,35 +237,9 @@ public class FSNamesystem implements FSC
// Block pool ID used by this namenode
String blockPoolId;
-
- /**
- * Stores the datanode -> block map.
- * <p>
- * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
- * storage id. In order to keep the storage map consistent it tracks
- * all storages ever registered with the namenode.
- * A descriptor corresponding to a specific storage id can be
- * <ul>
- * <li>added to the map if it is a new storage id;</li>
- * <li>updated with a new datanode started as a replacement for the old one
- * with the same storage id; and </li>
- * <li>removed if and only if an existing datanode is restarted to serve a
- * different storage id.</li>
- * </ul> <br>
- * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
- * in the namespace image file. Only the {@link DatanodeInfo} part is
- * persistent, the list of blocks is restored from the datanode block
- * reports.
- * <p>
- * Mapping: StorageID -> DatanodeDescriptor
- */
- public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
- new TreeMap<String, DatanodeDescriptor>();
/**
- * Stores a set of DatanodeDescriptor objects.
- * This is a subset of {@link #datanodeMap}, containing nodes that are
- * considered alive.
+ * Stores a subset of datanodeMap, containing nodes that are considered alive.
* The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
*/
@@ -289,9 +263,6 @@ public class FSNamesystem implements FSC
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
private long heartbeatRecheckInterval;
- // heartbeatExpireInterval is how long namenode waits for datanode to report
- // heartbeat
- private long heartbeatExpireInterval;
//resourceRecheckInterval is how often namenode checks for the disk space availability
private long resourceRecheckInterval;
@@ -314,9 +285,6 @@ public class FSNamesystem implements FSC
*/
private final GenerationStamp generationStamp = new GenerationStamp();
- // Ask Datanode only up to this many blocks to delete.
- public int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
-
// precision of access times.
private long accessTimePrecision = 0;
@@ -513,14 +481,9 @@ public class FSNamesystem implements FSC
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
- long heartbeatInterval = conf.getLong(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
- this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
- 10 * heartbeatInterval;
this.serverDefaults = new FsServerDefaults(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
@@ -531,14 +494,6 @@ public class FSNamesystem implements FSC
this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
- //default limit
- this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
- 20*(int)(heartbeatInterval/1000));
- //use conf value if it is set.
- this.blockInvalidateLimit = conf.getInt(
- DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, this.blockInvalidateLimit);
- LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit);
-
this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
@@ -642,12 +597,7 @@ public class FSNamesystem implements FSC
out.println("Live Datanodes: "+live.size());
out.println("Dead Datanodes: "+dead.size());
blockManager.metaSave(out);
-
- //
- // Dump all datanodes
- //
- datanodeDump(out);
-
+
out.flush();
out.close();
} finally {
@@ -688,45 +638,7 @@ public class FSNamesystem implements FSC
readLock();
try {
checkSuperuserPrivilege();
-
- DatanodeDescriptor node = getDatanode(datanode);
- if (node == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
- + "Asking for blocks from an unrecorded node " + datanode.getName());
- throw new IllegalArgumentException(
- "Unexpected exception. Got getBlocks message for datanode " +
- datanode.getName() + ", but there is no info for it");
- }
-
- int numBlocks = node.numBlocks();
- if(numBlocks == 0) {
- return new BlocksWithLocations(new BlockWithLocations[0]);
- }
- Iterator<BlockInfo> iter = node.getBlockIterator();
- int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
- // skip blocks
- for(int i=0; i<startBlock; i++) {
- iter.next();
- }
- List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
- long totalSize = 0;
- BlockInfo curBlock;
- while(totalSize<size && iter.hasNext()) {
- curBlock = iter.next();
- if(!curBlock.isComplete()) continue;
- totalSize += addBlock(curBlock, results);
- }
- if(totalSize<size) {
- iter = node.getBlockIterator(); // start from the beginning
- for(int i=0; i<startBlock&&totalSize<size; i++) {
- curBlock = iter.next();
- if(!curBlock.isComplete()) continue;
- totalSize += addBlock(curBlock, results);
- }
- }
-
- return new BlocksWithLocations(
- results.toArray(new BlockWithLocations[results.size()]));
+ return blockManager.getBlocksWithLocations(datanode, size);
} finally {
readUnlock();
}
@@ -742,22 +654,6 @@ public class FSNamesystem implements FSC
: ExportedBlockKeys.DUMMY_KEYS;
}
- /**
- * Get all valid locations of the block & add the block to results
- * return the length of the added block; 0 if the block is not added
- */
- private long addBlock(Block block, List<BlockWithLocations> results) {
- assert hasReadOrWriteLock();
- ArrayList<String> machineSet = blockManager.getValidLocations(block);
- if(machineSet.size() == 0) {
- return 0;
- } else {
- results.add(new BlockWithLocations(block,
- machineSet.toArray(new String[machineSet.size()])));
- return block.getNumBytes();
- }
- }
-
/////////////////////////////////////////////////////////
//
// These methods are called by HadoopFS clients
@@ -1795,7 +1691,8 @@ public class FSNamesystem implements FSC
//find datanode descriptors
chosen = new ArrayList<DatanodeDescriptor>();
for(DatanodeInfo d : existings) {
- final DatanodeDescriptor descriptor = getDatanode(d);
+ final DatanodeDescriptor descriptor = blockManager.getDatanodeManager(
+ ).getDatanode(d);
if (descriptor != null) {
chosen.add(descriptor);
}
@@ -2622,7 +2519,8 @@ public class FSNamesystem implements FSC
if (newtargets.length > 0) {
descriptors = new DatanodeDescriptor[newtargets.length];
for(int i = 0; i < newtargets.length; i++) {
- descriptors[i] = getDatanode(newtargets[i]);
+ descriptors[i] = blockManager.getDatanodeManager().getDatanode(
+ newtargets[i]);
}
}
if (closeFile) {
@@ -2766,15 +2664,6 @@ public class FSNamesystem implements FSC
return Storage.getRegistrationID(dir.fsImage.getStorage());
}
- public boolean isDatanodeDead(DatanodeDescriptor node) {
- return (node.getLastUpdate() <
- (now() - heartbeatExpireInterval));
- }
-
- private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
- node.setLastUpdate(0);
- }
-
/**
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
@@ -2792,91 +2681,32 @@ public class FSNamesystem implements FSC
throws IOException {
readLock();
try {
- return handleHeartbeatInternal(nodeReg, capacity, dfsUsed,
- remaining, blockPoolUsed, xceiverCount, xmitsInProgress,
- failedVolumes);
+ final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
+ DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
+ nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
+ xceiverCount, maxTransfer, failedVolumes);
+ if (cmds != null) {
+ return cmds;
+ }
+
+ //check distributed upgrade
+ DatanodeCommand cmd = getDistributedUpgradeCommand();
+ if (cmd != null) {
+ return new DatanodeCommand[] {cmd};
+ }
+ return null;
} finally {
readUnlock();
}
}
- /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
- DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
- long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- int xceiverCount, int xmitsInProgress, int failedVolumes)
- throws IOException {
- assert hasReadLock();
- DatanodeCommand cmd = null;
- synchronized (heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeinfo = null;
- try {
- nodeinfo = getDatanode(nodeReg);
- } catch(UnregisteredNodeException e) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
- }
-
- // Check if this datanode should actually be shutdown instead.
- if (nodeinfo != null && nodeinfo.isDisallowed()) {
- setDatanodeDead(nodeinfo);
- throw new DisallowedDatanodeException(nodeinfo);
- }
-
- if (nodeinfo == null || !nodeinfo.isAlive) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
- }
-
- updateStats(nodeinfo, false);
- nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
- xceiverCount, failedVolumes);
- updateStats(nodeinfo, true);
-
- //check lease recovery
- BlockInfoUnderConstruction[] blocks = nodeinfo
- .getLeaseRecoveryCommand(Integer.MAX_VALUE);
- if (blocks != null) {
- BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
- blocks.length);
- for (BlockInfoUnderConstruction b : blocks) {
- brCommand.add(new RecoveringBlock(
- new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
- .getBlockRecoveryId()));
- }
- return new DatanodeCommand[] { brCommand };
- }
-
- ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
- //check pending replication
- List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
- blockManager.maxReplicationStreams - xmitsInProgress);
- if (pendingList != null) {
- cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
- pendingList);
- cmds.add(cmd);
- }
- //check block invalidation
- Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
- if (blks != null) {
- cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks);
- cmds.add(cmd);
- }
- // check access key update
- if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
- cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
- nodeinfo.needKeyUpdate = false;
- }
- if (!cmds.isEmpty()) {
- return cmds.toArray(new DatanodeCommand[cmds.size()]);
- }
- }
- }
-
- //check distributed upgrade
- cmd = getDistributedUpgradeCommand();
- if (cmd != null) {
- return new DatanodeCommand[] {cmd};
+ public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
+ final DatanodeDescriptor nodeinfo) {
+ // check access key update
+ if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+ cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
+ nodeinfo.needKeyUpdate = false;
}
- return null;
}
public void updateStats(DatanodeDescriptor node, boolean isAdded) {
@@ -3017,7 +2847,8 @@ public class FSNamesystem implements FSC
) throws UnregisteredNodeException {
writeLock();
try {
- DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+ DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
+ ).getDatanode(nodeID);
if (nodeInfo != null) {
removeDatanode(nodeInfo);
} else {
@@ -3033,7 +2864,7 @@ public class FSNamesystem implements FSC
* Remove a datanode descriptor.
* @param nodeInfo datanode descriptor.
*/
- private void removeDatanode(DatanodeDescriptor nodeInfo) {
+ public void removeDatanode(DatanodeDescriptor nodeInfo) {
assert hasWriteLock();
synchronized (heartbeats) {
if (nodeInfo.isAlive) {
@@ -3064,6 +2895,7 @@ public class FSNamesystem implements FSC
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
+ final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (isInSafeMode()) {
@@ -3079,7 +2911,7 @@ public class FSNamesystem implements FSC
for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
it.hasNext();) {
DatanodeDescriptor nodeInfo = it.next();
- if (isDatanodeDead(nodeInfo)) {
+ if (datanodeManager.isDatanodeDead(nodeInfo)) {
expiredHeartbeats.incr();
foundDead = true;
nodeID = nodeInfo;
@@ -3095,21 +2927,7 @@ public class FSNamesystem implements FSC
return;
}
try {
- synchronized(heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeInfo = null;
- try {
- nodeInfo = getDatanode(nodeID);
- } catch (IOException e) {
- nodeInfo = null;
- }
- if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
- + "lost heartbeat from " + nodeInfo.getName());
- removeDatanode(nodeInfo);
- }
- }
- }
+ datanodeManager.removeDeadDatanode(nodeID);
} finally {
writeUnlock();
}
@@ -3129,7 +2947,8 @@ public class FSNamesystem implements FSC
writeLock();
startTime = now(); //after acquiring write lock
try {
- DatanodeDescriptor node = getDatanode(nodeID);
+ final DatanodeDescriptor node = blockManager.getDatanodeManager(
+ ).getDatanode(nodeID);
if (node == null || !node.isAlive) {
throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
@@ -3269,7 +3088,8 @@ public class FSNamesystem implements FSC
) throws IOException {
writeLock();
try {
- DatanodeDescriptor node = getDatanode(nodeID);
+ final DatanodeDescriptor node = blockManager.getDatanodeManager(
+ ).getDatanode(nodeID);
if (node == null || !node.isAlive) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+ " is received from dead or unregistered node " + nodeID.getName());
@@ -3475,33 +3295,7 @@ public class FSNamesystem implements FSC
ArrayList<DatanodeDescriptor> dead) {
readLock();
try {
- final List<DatanodeDescriptor> results = getBlockManager(
- ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);
- for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if (isDatanodeDead(node))
- dead.add(node);
- else
- live.add(node);
- }
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Prints information about all datanodes.
- */
- private void datanodeDump(PrintWriter out) {
- readLock();
- try {
- synchronized (datanodeMap) {
- out.println("Metasave: Number of datanodes: " + datanodeMap.size());
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- out.println(node.dumpDatanode());
- }
- }
+ getBlockManager().getDatanodeManager().fetchDatanodess(live, dead);
} finally {
readUnlock();
}
@@ -3556,30 +3350,6 @@ public class FSNamesystem implements FSC
checkSuperuserPrivilege();
getFSImage().finalizeUpgrade();
}
-
-
- /**
- * Get data node by storage ID.
- *
- * @param nodeID
- * @return DatanodeDescriptor or null if the node is not found.
- * @throws IOException
- */
- public DatanodeDescriptor getDatanode(DatanodeID nodeID
- ) throws UnregisteredNodeException {
- assert hasReadOrWriteLock();
- UnregisteredNodeException e = null;
- DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
- if (node == null)
- return null;
- if (!node.getName().equals(nodeID.getName())) {
- e = new UnregisteredNodeException(nodeID, node);
- NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
- + e.getLocalizedMessage());
- throw e;
- }
- return node;
- }
/**
* SafeModeInfo contains information related to the safe mode.
@@ -4503,43 +4273,14 @@ public class FSNamesystem implements FSC
}
- /**
- * Number of live data nodes
- * @return Number of live data nodes
- */
@Override // FSNamesystemMBean
public int getNumLiveDataNodes() {
- int numLive = 0;
- synchronized (datanodeMap) {
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- if (!isDatanodeDead(dn) ) {
- numLive++;
- }
- }
- }
- return numLive;
+ return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
}
-
- /**
- * Number of dead data nodes
- * @return Number of dead data nodes
- */
@Override // FSNamesystemMBean
public int getNumDeadDataNodes() {
- int numDead = 0;
- synchronized (datanodeMap) {
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- if (isDatanodeDead(dn) ) {
- numDead++;
- }
- }
- }
- return numDead;
+ return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
}
/**
@@ -4699,11 +4440,12 @@ public class FSNamesystem implements FSC
blockinfo.setNumBytes(newBlock.getNumBytes());
// find the DatanodeDescriptor objects
+ final DatanodeManager dm = getBlockManager().getDatanodeManager();
DatanodeDescriptor[] descriptors = null;
if (newNodes.length > 0) {
descriptors = new DatanodeDescriptor[newNodes.length];
for(int i = 0; i < newNodes.length; i++) {
- descriptors[i] = getDatanode(newNodes[i]);
+ descriptors[i] = dm.getDatanode(newNodes[i]);
}
}
blockinfo.setExpectedLocations(descriptors);
@@ -4832,12 +4574,6 @@ public class FSNamesystem implements FSC
return blockManager.numCorruptReplicas(blk);
}
- /** Get a datanode descriptor given corresponding storageID */
- public DatanodeDescriptor getDatanode(String nodeID) {
- assert hasReadOrWriteLock();
- return datanodeMap.get(nodeID);
- }
-
/**
* Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Wed Jul 27 05:46:52 2011
@@ -226,7 +226,7 @@ public class TestDecommission {
writeConfigFile(excludeFile, nodes);
cluster.getNamesystem(nnIndex).refreshNodes(conf);
DatanodeInfo ret = NameNodeAdapter.getDatanode(
- cluster.getNameNode(nnIndex), info[index]);
+ cluster.getNamesystem(nnIndex), info[index]);
waitNodeState(ret, waitForState);
return ret;
}
@@ -466,7 +466,7 @@ public class TestDecommission {
// Stop decommissioning and verify stats
writeConfigFile(excludeFile, null);
fsn.refreshNodes(conf);
- DatanodeInfo ret = NameNodeAdapter.getDatanode(namenode, downnode);
+ DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
waitNodeState(ret, AdminStates.NORMAL);
verifyStats(namenode, fsn, ret, false);
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Wed Jul 27 05:46:52 2011
@@ -25,9 +25,30 @@ import java.util.Set;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon;
public class BlockManagerTestUtil {
+
+ /** @return the datanode descriptor for the given the given storageID. */
+ public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
+ final String storageID) {
+ ns.readLock();
+ try {
+ return ns.getBlockManager().getDatanodeManager().getDatanode(storageID);
+ } finally {
+ ns.readUnlock();
+ }
+ }
+
+
+ /**
+ * Refresh block queue counts on the name-node.
+ */
+ public static void updateState(final BlockManager blockManager) {
+ blockManager.updateState();
+ }
+
/**
* @return a tuple of the replica state (number racks, number live
* replicas, and number needed replicas) for the given block.
Copied: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java (from r1151310, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java?p2=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java&p1=hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java&r1=1151310&r2=1151339&rev=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java Wed Jul 27 05:46:52 2011
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
import junit.framework.TestCase;
@@ -23,8 +23,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
/**
* Test if FSNamesystem handles heartbeat right
@@ -41,6 +43,8 @@ public class TestComputeInvalidateWork e
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
+ final BlockManager bm = namesystem.getBlockManager();
+ final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
DatanodeDescriptor[] nodes =
namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
assertEquals(nodes.length, NUM_OF_DATANODES);
@@ -48,26 +52,25 @@ public class TestComputeInvalidateWork e
namesystem.writeLock();
try {
for (int i=0; i<nodes.length; i++) {
- for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
- Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0,
+ for(int j=0; j<3*blockInvalidateLimit+1; j++) {
+ Block block = new Block(i*(blockInvalidateLimit+1)+j, 0,
GenerationStamp.FIRST_VALID_STAMP);
- namesystem.getBlockManager().addToInvalidates(block, nodes[i]);
+ bm.addToInvalidates(block, nodes[i]);
}
}
- assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
- namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES+1));
- assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
- namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES));
- assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1),
- namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES-1));
- int workCount = namesystem.getBlockManager().computeInvalidateWork(1);
+ assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
+ bm.computeInvalidateWork(NUM_OF_DATANODES+1));
+ assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
+ bm.computeInvalidateWork(NUM_OF_DATANODES));
+ assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
+ bm.computeInvalidateWork(NUM_OF_DATANODES-1));
+ int workCount = bm.computeInvalidateWork(1);
if (workCount == 1) {
- assertEquals(namesystem.blockInvalidateLimit+1,
- namesystem.getBlockManager().computeInvalidateWork(2));
+ assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
} else {
- assertEquals(workCount, namesystem.blockInvalidateLimit);
- assertEquals(2, namesystem.getBlockManager().computeInvalidateWork(2));
+ assertEquals(workCount, blockInvalidateLimit);
+ assertEquals(2, bm.computeInvalidateWork(2));
}
} finally {
namesystem.writeUnlock();
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Wed Jul 27 05:46:52 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -54,14 +55,9 @@ public class TestHeartbeatHandling exten
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
-
- namesystem.readLock();
- DatanodeDescriptor dd;
- try {
- dd = namesystem.getDatanode(nodeReg);
- } finally {
- namesystem.readUnlock();
- }
+
+
+ final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Wed Jul 27 05:46:52 2011
@@ -596,7 +596,7 @@ public class TestBlockReport {
}
private void printStats() {
- NameNodeAdapter.refreshBlockCounts(cluster.getNameNode());
+ BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
if(LOG.isDebugEnabled()) {
LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount());
LOG.debug("Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks());
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Jul 27 05:46:52 2011
@@ -46,14 +46,6 @@ public class NameNodeAdapter {
}
/**
- * Refresh block queue counts on the name-node.
- * @param namenode to proxy the invocation to
- */
- public static void refreshBlockCounts(NameNode namenode) {
- namenode.getNamesystem().getBlockManager().updateState();
- }
-
- /**
* Get the internal RPC server instance.
* @return rpc server
*/
@@ -68,12 +60,11 @@ public class NameNodeAdapter {
/**
* Return the datanode descriptor for the given datanode.
*/
- public static DatanodeDescriptor getDatanode(NameNode namenode,
+ public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
DatanodeID id) throws IOException {
- FSNamesystem ns = namenode.getNamesystem();
ns.readLock();
try {
- return ns.getDatanode(id);
+ return ns.getBlockManager().getDatanodeManager().getDatanode(id);
} finally {
ns.readUnlock();
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1151339&r1=1151338&r2=1151339&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Wed Jul 27 05:46:52 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -61,13 +62,8 @@ public class TestDeadDatanode {
FSNamesystem namesystem = cluster.getNamesystem();
String state = alive ? "alive" : "dead";
while (System.currentTimeMillis() < stopTime) {
- namesystem.readLock();
- DatanodeDescriptor dd;
- try {
- dd = namesystem.getDatanode(nodeID);
- } finally {
- namesystem.readUnlock();
- }
+ final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
+ namesystem, nodeID);
if (dd.isAlive == alive) {
LOG.info("datanode " + nodeID + " is " + state);
return;