You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2009/05/07 00:32:35 UTC
svn commit: r772450 [2/2] - in /hadoop/core/trunk: ./
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/
src/test/org/apache/hadoop/hdfs/server/namenode/
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed May 6 22:32:34 2009
@@ -40,9 +40,7 @@
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
@@ -50,6 +48,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -127,24 +126,13 @@
private FSNamesystemMetrics myFSMetrics;
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
private int totalLoad = 0;
- private long pendingReplicationBlocksCount = 0L, corruptReplicaBlocksCount,
- underReplicatedBlocksCount = 0L, scheduledReplicationBlocksCount = 0L;
//
// Stores the correct file name hierarchy
//
public FSDirectory dir;
- //
- // Mapping: Block -> { INode, datanodes, self ref }
- // Updated only in response to client-sent information.
- //
- BlocksMap blocksMap = new BlocksMap();
-
- //
- // Store blocks-->datanodedescriptor(s) map of corrupt replicas
- //
- public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+ BlockManager blockManager;
/**
* Stores the datanode -> block map.
@@ -170,24 +158,6 @@
NavigableMap<String, DatanodeDescriptor> datanodeMap =
new TreeMap<String, DatanodeDescriptor>();
- //
- // Keeps a Collection for every named machine containing
- // blocks that have recently been invalidated and are thought to live
- // on the machine in question.
- // Mapping: StorageID -> ArrayList<Block>
- //
- private Map<String, Collection<Block>> recentInvalidateSets =
- new TreeMap<String, Collection<Block>>();
-
- //
- // Keeps a TreeSet for every named node. Each treeset contains
- // a list of the blocks that are "extra" at that location. We'll
- // eventually remove these extras.
- // Mapping: StorageID -> TreeSet<Block>
- //
- Map<String, Collection<Block>> excessReplicateMap =
- new TreeMap<String, Collection<Block>>();
-
Random r = new Random();
/**
@@ -199,14 +169,6 @@
*/
ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
- //
- // Store set of Blocks that need to be replicated 1 or more times.
- // We also store pending replication-orders.
- // Set of: Block
- //
- private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
- private PendingReplicationBlocks pendingReplications;
-
public LeaseManager leaseManager = new LeaseManager(this);
//
@@ -221,14 +183,6 @@
private volatile boolean fsRunning = true;
long systemStart = 0;
- // The maximum number of replicates we should allow for a single block
- private int maxReplication;
- // How many outgoing replication streams a given node should have at one time
- private int maxReplicationStreams;
- // MIN_REPLICATION is how many copies we need in place or else we disallow the write
- private int minReplication;
- // Default replication
- private int defaultReplication;
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
private long heartbeatRecheckInterval;
// heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -241,22 +195,12 @@
// allow appending to hdfs files
private boolean supportAppends = true;
- /**
- * Last block index used for replication work.
- */
- private int replIndex = 0;
- private long missingBlocksInCurIter = 0;
- private long missingBlocksInPrevIter = 0;
-
private SafeModeInfo safeMode; // safe mode information
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
// datanode networktoplogy
NetworkTopology clusterMap = new NetworkTopology();
private DNSToSwitchMapping dnsToSwitchMapping;
-
- // for block replicas placement
- ReplicationTargetChooser replicator;
private HostsFileReader hostsReader;
private Daemon dnthread = null;
@@ -292,8 +236,8 @@
*/
private void initialize(Configuration conf, FSImage fsImage) throws IOException {
this.systemStart = now();
+ this.blockManager = new BlockManager(this, conf);
setConfigurationParameters(conf);
-
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
if(fsImage == null) {
this.dir = new FSDirectory(this, conf);
@@ -308,9 +252,6 @@
this.dir = new FSDirectory(fsImage, this, conf);
}
this.safeMode = new SafeModeInfo(conf);
- pendingReplications = new PendingReplicationBlocks(
- conf.getInt("dfs.replication.pending.timeout.sec",
- -1) * 1000L);
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
}
@@ -320,7 +261,7 @@
*/
void activate(Configuration conf) throws IOException {
setBlockTotal();
- pendingReplications.start();
+ blockManager.activate();
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor());
this.replthread = new Daemon(new ReplicationMonitor());
@@ -393,6 +334,7 @@
* is stored
*/
FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
+ this.blockManager = new BlockManager(this, conf);
setConfigurationParameters(conf);
this.dir = new FSDirectory(fsImage, this, conf);
}
@@ -436,30 +378,6 @@
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getUserName(), supergroup, new FsPermission(filePermission));
-
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true),
- this,
- clusterMap);
- this.defaultReplication = conf.getInt("dfs.replication", 3);
- this.maxReplication = conf.getInt("dfs.replication.max", 512);
- this.minReplication = conf.getInt("dfs.replication.min", 1);
- if (minReplication <= 0)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be greater than 0");
- if (maxReplication >= (int)Short.MAX_VALUE)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.max = "
- + maxReplication + " must be less than " + (Short.MAX_VALUE));
- if (maxReplication < minReplication)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be less than dfs.replication.max = "
- + maxReplication);
- this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
this.heartbeatRecheckInterval = conf.getInt(
"heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
@@ -497,7 +415,7 @@
public void close() {
fsRunning = false;
try {
- if (pendingReplications != null) pendingReplications.stop();
+ if (blockManager != null) blockManager.close();
if (hbthread != null) hbthread.interrupt();
if (replthread != null) replthread.interrupt();
if (dnthread != null) dnthread.interrupt();
@@ -534,48 +452,8 @@
filename);
PrintWriter out = new PrintWriter(new BufferedWriter(
new FileWriter(file, true)));
-
-
- //
- // Dump contents of neededReplication
- //
- synchronized (neededReplications) {
- out.println("Metasave: Blocks waiting for replication: " +
- neededReplications.size());
- for (Block block : neededReplications) {
- List<DatanodeDescriptor> containingNodes =
- new ArrayList<DatanodeDescriptor>();
- NumberReplicas numReplicas = new NumberReplicas();
- // source node returned is not used
- chooseSourceDatanode(block, containingNodes, numReplicas);
- int usableReplicas = numReplicas.liveReplicas() +
- numReplicas.decommissionedReplicas();
- // l: == live:, d: == decommissioned c: == corrupt e: == excess
- out.print(block + " (replicas:" +
- " l: " + numReplicas.liveReplicas() +
- " d: " + numReplicas.decommissionedReplicas() +
- " c: " + numReplicas.corruptReplicas() +
- " e: " + numReplicas.excessReplicas() +
- ((usableReplicas > 0)? "" : " MISSING") + ")");
-
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext();) {
- DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : ");
- }
- out.println("");
- }
- }
- //
- // Dump blocks from pendingReplication
- //
- pendingReplications.metaSave(out);
-
- //
- // Dump blocks that are waiting to be deleted
- //
- dumpRecentInvalidateSets(out);
+ blockManager.metaSave(out);
//
// Dump all datanodes
@@ -597,28 +475,6 @@
private boolean isAccessTimeSupported() {
return accessTimePrecision > 0;
}
-
- /* get replication factor of a block */
- private int getReplication(Block block) {
- INodeFile fileINode = blocksMap.getINode(block);
- if (fileINode == null) { // block does not belong to any file
- return 0;
- }
- assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
- return fileINode.getReplication();
- }
-
- /* updates a block in under replication queue */
- synchronized void updateNeededReplications(Block block,
- int curReplicasDelta, int expectedReplicasDelta) {
- NumberReplicas repl = countNodes(block);
- int curExpectedReplicas = getReplication(block);
- neededReplications.update(block,
- repl.liveReplicas(),
- repl.decommissionedReplicas(),
- curExpectedReplicas,
- curReplicasDelta, expectedReplicasDelta);
- }
/////////////////////////////////////////////////////////
//
@@ -641,7 +497,7 @@
NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+ "Asking for blocks from an unrecorded node " + datanode.getName());
throw new IllegalArgumentException(
- "Unexpected exception. Got getBlocks message for datanode " +
+ "Unexpected exception. Got getBlocks message for datanode " +
datanode.getName() + ", but there is no info for it");
}
@@ -666,27 +522,17 @@
totalSize += addBlock(iter.next(), results);
}
}
-
+
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
}
-
+
/**
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
- ArrayList<String> machineSet =
- new ArrayList<String>(blocksMap.numNodes(block));
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(block); it.hasNext();) {
- String storageID = it.next().getStorageID();
- // filter invalidate replicas
- Collection<Block> blocks = recentInvalidateSets.get(storageID);
- if(blocks==null || !blocks.contains(block)) {
- machineSet.add(storageID);
- }
- }
+ ArrayList<String> machineSet = blockManager.addBlock(block);
if(machineSet.size() == 0) {
return 0;
} else {
@@ -821,58 +667,9 @@
if (blocks.length == 0) {
return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
}
- List<LocatedBlock> results;
- results = new ArrayList<LocatedBlock>(blocks.length);
-
- int curBlk = 0;
- long curPos = 0, blkSize = 0;
- int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
- for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
- blkSize = blocks[curBlk].getNumBytes();
- assert blkSize > 0 : "Block of size 0";
- if (curPos + blkSize > offset) {
- break;
- }
- curPos += blkSize;
- }
-
- if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return null;
-
- long endOff = offset + length;
-
- do {
- // get block locations
- int numNodes = blocksMap.numNodes(blocks[curBlk]);
- int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
- int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);
- if (numCorruptNodes != numCorruptReplicas) {
- LOG.warn("Inconsistent number of corrupt replicas for " +
- blocks[curBlk] + "blockMap has " + numCorruptNodes +
- " but corrupt replicas map has " + numCorruptReplicas);
- }
- boolean blockCorrupt = (numCorruptNodes == numNodes);
- int numMachineSet = blockCorrupt ? numNodes :
- (numNodes - numCorruptNodes);
- DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
- if (numMachineSet > 0) {
- numNodes = 0;
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
- if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
- machineSet[numNodes++] = dn;
- }
- }
- results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
- blockCorrupt));
- curPos += blocks[curBlk].getNumBytes();
- curBlk++;
- } while (curPos < endOff
- && curBlk < blocks.length
- && results.size() < nrBlocksToReturn);
+ List<LocatedBlock> results = blockManager.getBlockLocations(blocks,
+ offset, length, nrBlocksToReturn);
return inode.createLocatedBlocks(results);
}
@@ -935,7 +732,7 @@
) throws IOException {
if (isInSafeMode())
throw new SafeModeException("Cannot set replication for " + src, safeMode);
- verifyReplication(src, replication, null);
+ blockManager.verifyReplication(src, replication, null);
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
}
@@ -951,14 +748,14 @@
// update needReplication priority queues
for(int idx = 0; idx < fileBlocks.length; idx++)
- updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
+ blockManager.updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
if (oldRepl > replication) {
// old replication > the new one; need to remove copies
LOG.info("Reducing replication for file " + src
+ ". New replication is " + replication);
for(int idx = 0; idx < fileBlocks.length; idx++)
- processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
+ blockManager.processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
} else { // replication factor is increased
LOG.info("Increasing replication for file " + src
+ ". New replication is " + replication);
@@ -972,33 +769,6 @@
}
return dir.getPreferredBlockSize(filename);
}
-
- /**
- * Check whether the replication parameter is within the range
- * determined by system configuration.
- */
- private void verifyReplication(String src,
- short replication,
- String clientName
- ) throws IOException {
-
- if (replication >= minReplication && replication <= maxReplication) {
- //common case. avoid building 'text'
- return;
- }
-
- String text = "file " + src
- + ((clientName != null) ? " on client " + clientName : "")
- + ".\n"
- + "Requested replication " + replication;
-
- if (replication > maxReplication)
- throw new IOException(text + " exceeds maximum " + maxReplication);
-
- if (replication < minReplication)
- throw new IOException(
- text + " is less than the required minimum " + minReplication);
- }
/**
* Create a new file entry in the namespace.
@@ -1107,7 +877,7 @@
}
try {
- verifyReplication(src, replication, clientMachine);
+ blockManager.verifyReplication(src, replication, clientMachine);
} catch(IOException e) {
throw new IOException("failed to create "+e.getMessage());
}
@@ -1188,7 +958,7 @@
" Please refer to dfs.support.append configuration parameter.");
}
startFileInternal(src, null, holder, clientMachine, false, true,
- (short)maxReplication, (long)0);
+ (short)blockManager.maxReplication, (long)0);
getEditLog().logSync();
//
@@ -1203,15 +973,11 @@
Block[] blocks = file.getBlocks();
if (blocks != null && blocks.length > 0) {
Block last = blocks[blocks.length-1];
- BlockInfo storedBlock = blocksMap.getStoredBlock(last);
+ BlockInfo storedBlock = blockManager.getStoredBlock(last);
if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
long fileLength = file.computeContentSummary().getLength();
- DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
- for (int i = 0; it != null && it.hasNext(); i++) {
- targets[i] = it.next();
- }
- // remove the replica locations of this block from the blocksMap
+ DatanodeDescriptor[] targets = blockManager.getNodes(last);
+ // remove the replica locations of this block from the node
for (int i = 0; i < targets.length; i++) {
targets[i].removeBlock(storedBlock);
}
@@ -1222,17 +988,14 @@
fileLength-storedBlock.getNumBytes());
// Remove block from replication queue.
- updateNeededReplications(last, 0, 0);
+ blockManager.updateNeededReplications(last, 0, 0);
// remove this block from the list of pending blocks to be deleted.
// This reduces the possibility of triggering HADOOP-1349.
//
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
- Collection<Block> v = recentInvalidateSets.get(datanodeId);
- if (v != null && v.remove(last) && v.isEmpty()) {
- recentInvalidateSets.remove(datanodeId);
- }
+ blockManager.removeFromInvalidates(datanodeId, last);
}
}
}
@@ -1298,15 +1061,13 @@
replication = (int)pendingFile.getReplication();
}
- // choose targets for the new block tobe allocated.
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
- clientNode,
- null,
- blockSize);
- if (targets.length < this.minReplication) {
+ // choose targets for the new block to be allocated.
+ DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
+ replication, clientNode, null, blockSize);
+ if (targets.length < blockManager.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
- minReplication);
+ blockManager.minReplication);
}
// Allocate a new block and record it in the INode.
@@ -1447,14 +1208,7 @@
Block[] pendingBlocks = file.getBlocks();
int nrBlocks = pendingBlocks.length;
for (int i = 0; i < nrBlocks; i++) {
- // filter out containingNodes that are marked for decommission.
- NumberReplicas number = countNodes(pendingBlocks[i]);
- if (number.liveReplicas() < numExpectedReplicas) {
- neededReplications.add(pendingBlocks[i],
- number.liveReplicas(),
- number.decommissionedReplicas,
- numExpectedReplicas);
- }
+ blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
}
}
@@ -1490,7 +1244,7 @@
// check all blocks of the file.
//
for (Block block: v.getBlocks()) {
- if (blocksMap.numNodes(block) < this.minReplication) {
+ if (!blockManager.checkMinReplication(block)) {
return false;
}
}
@@ -1499,78 +1253,13 @@
// check the penultimate block of this file
//
Block b = v.getPenultimateBlock();
- if (b != null) {
- if (blocksMap.numNodes(b) < this.minReplication) {
- return false;
- }
+ if (b != null && !blockManager.checkMinReplication(b)) {
+ return false;
}
}
return true;
}
- /**
- * Remove a datanode from the invalidatesSet
- * @param n datanode
- */
- private void removeFromInvalidates(DatanodeInfo n) {
- recentInvalidateSets.remove(n.getStorageID());
- }
-
- /**
- * Adds block to list of blocks which will be invalidated on
- * specified datanode and log the move
- * @param b block
- * @param n datanode
- */
- void addToInvalidates(Block b, DatanodeInfo n) {
- addToInvalidatesNoLog(b, n);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
- + b.getBlockName() + " is added to invalidSet of " + n.getName());
- }
-
- /**
- * Adds block to list of blocks which will be invalidated on
- * specified datanode
- * @param b block
- * @param n datanode
- */
- void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
- Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
- if (invalidateSet == null) {
- invalidateSet = new HashSet<Block>();
- recentInvalidateSets.put(n.getStorageID(), invalidateSet);
- }
- invalidateSet.add(b);
- }
-
- /**
- * Adds block to list of blocks which will be invalidated on
- * all its datanodes.
- */
- private void addToInvalidates(Block b) {
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(b); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
- }
- }
-
- /**
- * dumps the contents of recentInvalidateSets
- */
- private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
- int size = recentInvalidateSets.values().size();
- out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
- if (size == 0) {
- return;
- }
- for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
- Collection<Block> blocks = entry.getValue();
- if (blocks.size() > 0) {
- out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
- }
- }
- }
/**
* Mark the block belonging to datanode as corrupt
@@ -1579,75 +1268,9 @@
*/
public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
throws IOException {
- DatanodeDescriptor node = getDatanode(dn);
- if (node == null) {
- throw new IOException("Cannot mark block" + blk.getBlockName() +
- " as corrupt because datanode " + dn.getName() +
- " does not exist. ");
- }
-
- final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
- if (storedBlockInfo == null) {
- // Check if the replica is in the blockMap, if not
- // ignore the request for now. This could happen when BlockScanner
- // thread of Datanode reports bad block before Block reports are sent
- // by the Datanode on startup
- NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
- "block " + blk + " could not be marked " +
- "as corrupt as it does not exists in " +
- "blocksMap");
- } else {
- INodeFile inode = storedBlockInfo.getINode();
- if (inode == null) {
- NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
- "block " + blk + " could not be marked " +
- "as corrupt as it does not belong to " +
- "any file");
- addToInvalidates(storedBlockInfo, node);
- return;
- }
- // Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
- if (countNodes(storedBlockInfo).liveReplicas()>inode.getReplication()) {
- // the block is over-replicated so invalidate the replicas immediately
- invalidateBlock(storedBlockInfo, node);
- } else {
- // add the block to neededReplication
- updateNeededReplications(storedBlockInfo, -1, 0);
- }
- }
+ blockManager.markBlockAsCorrupt(blk, dn);
}
- /**
- * Invalidates the given block on the given datanode.
- */
- public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
- throws IOException {
- NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
- + blk + " on "
- + dn.getName());
- DatanodeDescriptor node = getDatanode(dn);
- if (node == null) {
- throw new IOException("Cannot invalidate block " + blk +
- " because datanode " + dn.getName() +
- " does not exist.");
- }
-
- // Check how many copies we have of the block. If we have at least one
- // copy on a live node, then we can delete it.
- int count = countNodes(blk).liveReplicas();
- if (count > 1) {
- addToInvalidates(blk, dn);
- removeStoredBlock(blk, node);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
- + blk + " on "
- + dn.getName() + " listed for deletion.");
- } else {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
- + blk + " on "
- + dn.getName() + " is the only copy and was not deleted.");
- }
- }
////////////////////////////////////////////////////////////////
// Here's how to handle block-copy failure during client write:
@@ -1738,9 +1361,7 @@
void removePathAndBlocks(String src, List<Block> blocks) {
leaseManager.removeLeaseWithPrefixPath(src);
for(Block b : blocks) {
- blocksMap.removeINode(b);
- corruptReplicas.removeFromCorruptReplicasMap(b);
- addToInvalidates(b);
+ blockManager.removeBlock(b);
}
}
@@ -1883,16 +1504,11 @@
+ " internalReleaseLease: No blocks found, lease removed.");
return;
}
- // setup the Inode.targets for the last block from the blocksMap
+ // setup the Inode.targets for the last block from the blockManager
//
Block[] blocks = pendingFile.getBlocks();
Block last = blocks[blocks.length-1];
- DatanodeDescriptor[] targets =
- new DatanodeDescriptor[blocksMap.numNodes(last)];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
- for (int i = 0; it != null && it.hasNext(); i++) {
- targets[i] = it.next();
- }
+ DatanodeDescriptor[] targets = blockManager.getNodes(last);
pendingFile.setTargets(targets);
}
// start lease recovery of the last block for this file.
@@ -1926,7 +1542,7 @@
+ ", closeFile=" + closeFile
+ ", deleteBlock=" + deleteblock
+ ")");
- final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
+ final BlockInfo oldblockinfo = blockManager.getStoredBlock(lastblock);
if (oldblockinfo == null) {
throw new IOException("Block (=" + lastblock + ") not found");
}
@@ -1941,7 +1557,7 @@
// Remove old block from blocks map. This always have to be done
// because the generation stamp of this block is changing.
- blocksMap.removeBlock(oldblockinfo);
+ blockManager.removeBlockFromMap(oldblockinfo);
if (deleteblock) {
pendingFile.removeBlock(lastblock);
@@ -1949,10 +1565,10 @@
else {
// update last block, construct newblockinfo and add it to the blocks map
lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
- final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);
+ final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
// find the DatanodeDescriptor objects
- // There should be no locations in the blocksMap till now because the
+ // There should be no locations in the blockManager till now because the
// file is underConstruction
DatanodeDescriptor[] descriptors = null;
if (newtargets.length > 0) {
@@ -1962,7 +1578,7 @@
}
}
if (closeFile) {
- // the file is getting closed. Insert block locations into blocksMap.
+ // the file is getting closed. Insert block locations into blockManager.
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < descriptors.length; i++) {
@@ -2280,7 +1896,7 @@
ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
//check pending replication
cmd = nodeinfo.getReplicationCommand(
- maxReplicationStreams - xmitsInProgress);
+ blockManager.maxReplicationStreams - xmitsInProgress);
if (cmd != null) {
cmds.add(cmd);
}
@@ -2351,7 +1967,7 @@
while (fsRunning) {
try {
computeDatanodeWork();
- processPendingReplications();
+ blockManager.processPendingReplications();
Thread.sleep(replicationRecheckInterval);
} catch (InterruptedException ie) {
LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
@@ -2393,405 +2009,20 @@
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
}
- workFound = computeReplicationWork(blocksToProcess);
+ workFound = blockManager.computeReplicationWork(blocksToProcess);
// Update FSNamesystemMetrics counters
synchronized (this) {
- pendingReplicationBlocksCount = pendingReplications.size();
- underReplicatedBlocksCount = neededReplications.size();
- scheduledReplicationBlocksCount = workFound;
- corruptReplicaBlocksCount = corruptReplicas.size();
+ blockManager.updateState();
+ blockManager.scheduledReplicationBlocksCount = workFound;
}
- workFound += computeInvalidateWork(nodesToProcess);
+ workFound += blockManager.computeInvalidateWork(nodesToProcess);
return workFound;
}
- /**
- * Schedule blocks for deletion at datanodes
- * @param nodesToProcess number of datanodes to schedule deletion work
- * @return total number of block for deletion
- */
- int computeInvalidateWork(int nodesToProcess) {
- int numOfNodes = recentInvalidateSets.size();
- nodesToProcess = Math.min(numOfNodes, nodesToProcess);
-
- // get an array of the keys
- ArrayList<String> keyArray =
- new ArrayList<String>(recentInvalidateSets.keySet());
-
- // randomly pick up <i>nodesToProcess</i> nodes
- // and put them at [0, nodesToProcess)
- int remainingNodes = numOfNodes - nodesToProcess;
- if (nodesToProcess < remainingNodes) {
- for(int i=0; i<nodesToProcess; i++) {
- int keyIndex = r.nextInt(numOfNodes-i)+i;
- Collections.swap(keyArray, keyIndex, i); // swap to front
- }
- } else {
- for(int i=0; i<remainingNodes; i++) {
- int keyIndex = r.nextInt(numOfNodes-i);
- Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
- }
- }
-
- int blockCnt = 0;
- for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
- blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
- }
- return blockCnt;
- }
-
- /**
- * Scan blocks in {@link #neededReplications} and assign replication
- * work to data-nodes they belong to.
- *
- * The number of process blocks equals either twice the number of live
- * data-nodes or the number of under-replicated blocks whichever is less.
- *
- * @return number of blocks scheduled for replication during this iteration.
- */
- private int computeReplicationWork(
- int blocksToProcess) throws IOException {
- // Choose the blocks to be replicated
- List<List<Block>> blocksToReplicate =
- chooseUnderReplicatedBlocks(blocksToProcess);
-
- // replicate blocks
- int scheduledReplicationCount = 0;
- for (int i=0; i<blocksToReplicate.size(); i++) {
- for(Block block : blocksToReplicate.get(i)) {
- if (computeReplicationWorkForBlock(block, i)) {
- scheduledReplicationCount++;
- }
- }
- }
- return scheduledReplicationCount;
- }
-
- /** Get a list of block lists to be replicated
- * The index of block lists represents the
- *
- * @param blocksToProcess
- * @return Return a list of block lists to be replicated.
- * The block list index represents its replication priority.
- */
- synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
- // initialize data structure for the return value
- List<List<Block>> blocksToReplicate =
- new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
- for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
- blocksToReplicate.add(new ArrayList<Block>());
- }
-
- synchronized(neededReplications) {
- if (neededReplications.size() == 0) {
- missingBlocksInCurIter = 0;
- missingBlocksInPrevIter = 0;
- return blocksToReplicate;
- }
-
- // Go through all blocks that need replications.
- BlockIterator neededReplicationsIterator = neededReplications.iterator();
- // skip to the first unprocessed block, which is at replIndex
- for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
- neededReplicationsIterator.next();
- }
- // # of blocks to process equals either twice the number of live
- // data-nodes or the number of under-replicated blocks whichever is less
- blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
-
- for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
- if( ! neededReplicationsIterator.hasNext()) {
- // start from the beginning
- replIndex = 0;
- missingBlocksInPrevIter = missingBlocksInCurIter;
- missingBlocksInCurIter = 0;
- blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
- if(blkCnt >= blocksToProcess)
- break;
- neededReplicationsIterator = neededReplications.iterator();
- assert neededReplicationsIterator.hasNext() :
- "neededReplications should not be empty.";
- }
-
- Block block = neededReplicationsIterator.next();
- int priority = neededReplicationsIterator.getPriority();
- if (priority < 0 || priority >= blocksToReplicate.size()) {
- LOG.warn("Unexpected replication priority: " + priority + " " + block);
- } else {
- blocksToReplicate.get(priority).add(block);
- }
- } // end for
- } // end synchronized
- return blocksToReplicate;
- }
-
- /** Replicate a block
- *
- * @param block block to be replicated
- * @param priority a hint of its priority in the neededReplication queue
- * @return if the block gets replicated or not
- */
- boolean computeReplicationWorkForBlock(Block block, int priority) {
- int requiredReplication, numEffectiveReplicas;
- List<DatanodeDescriptor> containingNodes;
- DatanodeDescriptor srcNode;
-
- synchronized (this) {
- synchronized (neededReplications) {
- // block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
- // abandoned block or block reopened for append
- if(fileINode == null || fileINode.isUnderConstruction()) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- return false;
- }
- requiredReplication = fileINode.getReplication();
-
- // get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- NumberReplicas numReplicas = new NumberReplicas();
- srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
- if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
- <= 0) {
- missingBlocksInCurIter++;
- }
- if(srcNode == null) // block can not be replicated from any node
- return false;
-
- // do not schedule more if enough replicas is already pending
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
- }
- }
- }
-
- // choose replication targets: NOT HODING THE GLOBAL LOCK
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- requiredReplication - numEffectiveReplicas,
- srcNode, containingNodes, null, block.getNumBytes());
- if(targets.length == 0)
- return false;
-
- synchronized (this) {
- synchronized (neededReplications) {
- // Recheck since global lock was released
- // block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
- // abandoned block or block reopened for append
- if(fileINode == null || fileINode.isUnderConstruction()) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- return false;
- }
- requiredReplication = fileINode.getReplication();
-
- // do not schedule more if enough replicas is already pending
- NumberReplicas numReplicas = countNodes(block);
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
- }
-
- // Add block to the to be replicated list
- srcNode.addBlockToBeReplicated(block, targets);
-
- for (DatanodeDescriptor dn : targets) {
- dn.incBlocksScheduled();
- }
-
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- pendingReplications.add(block, targets.length);
- NameNode.stateChangeLog.debug(
- "BLOCK* block " + block
- + " is moved from neededReplications to pendingReplications");
-
- // remove from neededReplications
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- }
- if (NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuffer targetList = new StringBuffer("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
- targetList.append(' ');
- targetList.append(targets[k].getName());
- }
- NameNode.stateChangeLog.info(
- "BLOCK* ask "
- + srcNode.getName() + " to replicate "
- + block + " to " + targetList);
- NameNode.stateChangeLog.debug(
- "BLOCK* neededReplications = " + neededReplications.size()
- + " pendingReplications = " + pendingReplications.size());
- }
- }
- }
-
- return true;
- }
-
- /**
- * Parse the data-nodes the block belongs to and choose one,
- * which will be the replication source.
- *
- * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
- * since the former do not have write traffic and hence are less busy.
- * We do not use already decommissioned nodes as a source.
- * Otherwise we choose a random node among those that did not reach their
- * replication limit.
- *
- * In addition form a list of all nodes containing the block
- * and calculate its replication numbers.
- */
- private DatanodeDescriptor chooseSourceDatanode(
- Block block,
- List<DatanodeDescriptor> containingNodes,
- NumberReplicas numReplicas) {
- containingNodes.clear();
- DatanodeDescriptor srcNode = null;
- int live = 0;
- int decommissioned = 0;
- int corrupt = 0;
- int excess = 0;
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
- while(it.hasNext()) {
- DatanodeDescriptor node = it.next();
- Collection<Block> excessBlocks =
- excessReplicateMap.get(node.getStorageID());
- if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
- corrupt++;
- else if (node.isDecommissionInProgress() || node.isDecommissioned())
- decommissioned++;
- else if (excessBlocks != null && excessBlocks.contains(block)) {
- excess++;
- } else {
- live++;
- }
- containingNodes.add(node);
- // Check if this replica is corrupt
- // If so, do not select the node as src node
- if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
- continue;
- if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
- continue; // already reached replication limit
- // the block must not be scheduled for removal on srcNode
- if(excessBlocks != null && excessBlocks.contains(block))
- continue;
- // never use already decommissioned nodes
- if(node.isDecommissioned())
- continue;
- // we prefer nodes that are in DECOMMISSION_INPROGRESS state
- if(node.isDecommissionInProgress() || srcNode == null) {
- srcNode = node;
- continue;
- }
- if(srcNode.isDecommissionInProgress())
- continue;
- // switch to a different node randomly
- // this to prevent from deterministically selecting the same node even
- // if the node failed to replicate the block on previous iterations
- if(r.nextBoolean())
- srcNode = node;
- }
- if(numReplicas != null)
- numReplicas.initialize(live, decommissioned, corrupt, excess);
- return srcNode;
- }
-
- /**
- * Get blocks to invalidate for <i>nodeId</i>
- * in {@link #recentInvalidateSets}.
- *
- * @return number of blocks scheduled for removal during this iteration.
- */
- private synchronized int invalidateWorkForOneNode(String nodeId) {
- // blocks should not be replicated or removed if safe mode is on
- if (isInSafeMode())
- return 0;
- // get blocks to invalidate for the nodeId
- assert nodeId != null;
- DatanodeDescriptor dn = datanodeMap.get(nodeId);
- if (dn == null) {
- recentInvalidateSets.remove(nodeId);
- return 0;
- }
-
- Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
- if (invalidateSet == null)
- return 0;
-
- ArrayList<Block> blocksToInvalidate =
- new ArrayList<Block>(blockInvalidateLimit);
-
- // # blocks that can be sent in one message is limited
- Iterator<Block> it = invalidateSet.iterator();
- for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
- blkCount++) {
- blocksToInvalidate.add(it.next());
- it.remove();
- }
-
- // If we send everything in this message, remove this node entry
- if(!it.hasNext())
- recentInvalidateSets.remove(nodeId);
-
- dn.addBlocksToBeInvalidated(blocksToInvalidate);
-
- if(NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuffer blockList = new StringBuffer();
- for(Block blk : blocksToInvalidate) {
- blockList.append(' ');
- blockList.append(blk);
- }
- NameNode.stateChangeLog.info("BLOCK* ask "
- + dn.getName() + " to delete " + blockList);
- }
- return blocksToInvalidate.size();
- }
-
public void setNodeReplicationLimit(int limit) {
- this.maxReplicationStreams = limit;
- }
-
- /**
- * If there were any replication requests that timed out, reap them
- * and put them back into the neededReplication queue
- */
- void processPendingReplications() {
- Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
- if (timedOutItems != null) {
- synchronized (this) {
- for (int i = 0; i < timedOutItems.length; i++) {
- NumberReplicas num = countNodes(timedOutItems[i]);
- neededReplications.add(timedOutItems[i],
- num.liveReplicas(),
- num.decommissionedReplicas(),
- getReplication(timedOutItems[i]));
- }
- }
- /* If we know the target datanodes where the replication timedout,
- * we could invoke decBlocksScheduled() on it. Its ok for now.
- */
- }
+ blockManager.maxReplicationStreams = limit;
}
/**
@@ -2823,7 +2054,7 @@
}
for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
- removeStoredBlock(it.next(), nodeInfo);
+ blockManager.removeStoredBlock(it.next(), nodeInfo);
}
unprotectedRemoveDatanode(nodeInfo);
clusterMap.remove(nodeInfo);
@@ -2831,7 +2062,7 @@
void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
nodeDescr.resetBlocks();
- removeFromInvalidates(nodeDescr);
+ blockManager.removeFromInvalidates(nodeDescr);
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.unprotectedRemoveDatanode: "
+ nodeDescr.getName() + " is out of service now.");
@@ -2948,300 +2179,11 @@
throw new DisallowedDatanodeException(node);
}
- //
- // Modify the (block-->datanode) map, according to the difference
- // between the old and new block report.
- //
- Collection<Block> toAdd = new LinkedList<Block>();
- Collection<Block> toRemove = new LinkedList<Block>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
-
- for (Block b : toRemove) {
- removeStoredBlock(b, node);
- }
- for (Block b : toAdd) {
- addStoredBlock(b, node, null);
- }
- for (Block b : toInvalidate) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
- + b + " on " + node.getName() + " size " + b.getNumBytes()
- + " does not belong to any file.");
- addToInvalidates(b, node);
- }
+ blockManager.processReport(node, newReport);
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
}
/**
- * Modify (block-->datanode) map. Remove block from set of
- * needed replications if this takes care of the problem.
- * @return the block that is stored in blockMap.
- */
- synchronized Block addStoredBlock(Block block,
- DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint) {
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
- if(storedBlock == null || storedBlock.getINode() == null) {
- // If this block does not belong to anyfile, then we are done.
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- + "addStoredBlock request received for "
- + block + " on " + node.getName()
- + " size " + block.getNumBytes()
- + " But it does not belong to any file.");
- // we could add this block to invalidate set of this datanode.
- // it will happen in next block report otherwise.
- return block;
- }
-
- // add block to the data-node
- boolean added = node.addBlock(storedBlock);
-
- assert storedBlock != null : "Block must be stored by now";
-
- if (block != storedBlock) {
- if (block.getNumBytes() >= 0) {
- long cursize = storedBlock.getNumBytes();
- if (cursize == 0) {
- storedBlock.setNumBytes(block.getNumBytes());
- } else if (cursize != block.getNumBytes()) {
- LOG.warn("Inconsistent size for block " + block +
- " reported from " + node.getName() +
- " current size is " + cursize +
- " reported size is " + block.getNumBytes());
- try {
- if (cursize > block.getNumBytes()) {
- // new replica is smaller in size than existing block.
- // Mark the new replica as corrupt.
- LOG.warn("Mark new replica " + block + " from " + node.getName() +
- "as corrupt because its length is shorter than existing ones");
- markBlockAsCorrupt(block, node);
- } else {
- // new replica is larger in size than existing block.
- // Mark pre-existing replicas as corrupt.
- int numNodes = blocksMap.numNodes(block);
- int count = 0;
- DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- for (; it != null && it.hasNext(); ) {
- DatanodeDescriptor dd = it.next();
- if (!dd.equals(node)) {
- nodes[count++] = dd;
- }
- }
- for (int j = 0; j < count; j++) {
- LOG.warn("Mark existing replica " + block + " from " + node.getName() +
- " as corrupt because its length is shorter than the new one");
- markBlockAsCorrupt(block, nodes[j]);
- }
- //
- // change the size of block in blocksMap
- //
- storedBlock = blocksMap.getStoredBlock(block); //extra look up!
- if (storedBlock == null) {
- LOG.warn("Block " + block +
- " reported from " + node.getName() +
- " does not exist in blockMap. Surprise! Surprise!");
- } else {
- storedBlock.setNumBytes(block.getNumBytes());
- }
- }
- } catch (IOException e) {
- LOG.warn("Error in deleting bad block " + block + e);
- }
- }
-
- //Updated space consumed if required.
- INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
- long diff = (file == null) ? 0 :
- (file.getPreferredBlockSize() - storedBlock.getNumBytes());
-
- if (diff > 0 && file.isUnderConstruction() &&
- cursize < storedBlock.getNumBytes()) {
- try {
- String path = /* For finding parents */
- leaseManager.findPath((INodeFileUnderConstruction)file);
- dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
- } catch (IOException e) {
- LOG.warn("Unexpected exception while updating disk space : " +
- e.getMessage());
- }
- }
- }
- block = storedBlock;
- }
- assert storedBlock == block : "Block must be stored by now";
-
- int curReplicaDelta = 0;
-
- if (added) {
- curReplicaDelta = 1;
- //
- // At startup time, because too many new blocks come in
- // they take up lots of space in the log file.
- // So, we log only when namenode is out of safemode.
- //
- if (!isInSafeMode()) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
- }
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
- + "Redundant addStoredBlock request received for "
- + block + " on " + node.getName()
- + " size " + block.getNumBytes());
- }
-
- // filter out containingNodes that are marked for decommission.
- NumberReplicas num = countNodes(storedBlock);
- int numLiveReplicas = num.liveReplicas();
- int numCurrentReplica = numLiveReplicas
- + pendingReplications.getNumReplicas(block);
-
- // check whether safe replication is reached for the block
- incrementSafeBlockCount(numCurrentReplica);
-
- //
- // if file is being actively written to, then do not check
- // replication-factor here. It will be checked when the file is closed.
- //
- INodeFile fileINode = null;
- fileINode = storedBlock.getINode();
- if (fileINode.isUnderConstruction()) {
- return block;
- }
-
- // do not handle mis-replicated blocks during startup
- if(isInSafeMode())
- return block;
-
- // handle underReplication/overReplication
- short fileReplication = fileINode.getReplication();
- if (numCurrentReplica >= fileReplication) {
- neededReplications.remove(block, numCurrentReplica,
- num.decommissionedReplicas, fileReplication);
- } else {
- updateNeededReplications(block, curReplicaDelta, 0);
- }
- if (numCurrentReplica > fileReplication) {
- processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
- }
- // If the file replication has reached desired value
- // we can remove any corrupt replicas the block may have
- int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block);
- int numCorruptNodes = num.corruptReplicas();
- if ( numCorruptNodes != corruptReplicasCount) {
- LOG.warn("Inconsistent number of corrupt replicas for " +
- block + "blockMap has " + numCorruptNodes +
- " but corrupt replicas map has " + corruptReplicasCount);
- }
- if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
- invalidateCorruptReplicas(block);
- return block;
- }
-
- /**
- * Invalidate corrupt replicas.
- * <p>
- * This will remove the replicas from the block's location list,
- * add them to {@link #recentInvalidateSets} so that they could be further
- * deleted from the respective data-nodes,
- * and remove the block from corruptReplicasMap.
- * <p>
- * This method should be called when the block has sufficient
- * number of live replicas.
- *
- * @param blk Block whose corrupt replicas need to be invalidated
- */
- void invalidateCorruptReplicas(Block blk) {
- Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
- boolean gotException = false;
- if (nodes == null)
- return;
- for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
- DatanodeDescriptor node = it.next();
- try {
- invalidateBlock(blk, node);
- } catch (IOException e) {
- NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
- "error in deleting bad block " + blk +
- " on " + node + e);
- gotException = true;
- }
- }
- // Remove the block from corruptReplicasMap
- if (!gotException)
- corruptReplicas.removeFromCorruptReplicasMap(blk);
- }
-
- /**
- * For each block in the name-node verify whether it belongs to any file,
- * over or under replicated. Place it into the respective queue.
- */
- private synchronized void processMisReplicatedBlocks() {
- long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
- neededReplications.clear();
- for(BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
- INodeFile fileINode = block.getINode();
- if(fileINode == null) {
- // block does not belong to any file
- nrInvalid++;
- addToInvalidates(block);
- continue;
- }
- // calculate current replication
- short expectedReplication = fileINode.getReplication();
- NumberReplicas num = countNodes(block);
- int numCurrentReplica = num.liveReplicas();
- // add to under-replicated queue if need to be
- if (neededReplications.add(block,
- numCurrentReplica,
- num.decommissionedReplicas(),
- expectedReplication)) {
- nrUnderReplicated++;
- }
-
- if (numCurrentReplica > expectedReplication) {
- // over-replicated block
- nrOverReplicated++;
- processOverReplicatedBlock(block, expectedReplication, null, null);
- }
- }
- LOG.info("Total number of blocks = " + blocksMap.size());
- LOG.info("Number of invalid blocks = " + nrInvalid);
- LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
- LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
- }
-
- /**
- * Find how many of the containing nodes are "extra", if any.
- * If there are any extras, call chooseExcessReplicates() to
- * mark them in the excessReplicateMap.
- */
- private void processOverReplicatedBlock(Block block, short replication,
- DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
- if(addedNode == delNodeHint) {
- delNodeHint = null;
- }
- Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
- Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- it.hasNext();) {
- DatanodeDescriptor cur = it.next();
- Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
- if (excessBlocks == null || !excessBlocks.contains(block)) {
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
- // exclude corrupt replicas
- if (corruptNodes == null || !corruptNodes.contains(cur)) {
- nonExcess.add(cur);
- }
- }
- }
- }
- chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint);
- }
-
- /**
* We want "replication" replicates for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
@@ -3334,15 +2276,7 @@
}
nonExcess.remove(cur);
-
- Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
- if (excessBlocks == null) {
- excessBlocks = new TreeSet<Block>();
- excessReplicateMap.put(cur.getStorageID(), excessBlocks);
- }
- excessBlocks.add(b);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b+") is added to excessReplicateMap");
+ blockManager.addToExcessReplicate(cur, b);
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -3353,54 +2287,12 @@
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
- addToInvalidatesNoLog(b, cur);
+ blockManager.addToInvalidates(b, cur);
NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
+"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
}
}
- /**
- * Modify (block-->datanode) map. Possibly generate
- * replication tasks, if the removed block is still valid.
- */
- synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block + " from "+node.getName());
- if (!blocksMap.removeNode(block, node)) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block+" has already been removed from node "+node);
- return;
- }
-
- //
- // It's possible that the block was removed because of a datanode
- // failure. If the block is still valid, check if replication is
- // necessary. In that case, put block on a possibly-will-
- // be-replicated list.
- //
- INode fileINode = blocksMap.getINode(block);
- if (fileINode != null) {
- decrementSafeBlockCount(block);
- updateNeededReplications(block, -1, 0);
- }
-
- //
- // We've removed a block from a node, so it's definitely no longer
- // in "excess" there.
- //
- Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID());
- if (excessBlocks != null) {
- excessBlocks.remove(block);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block+" is removed from excessBlocks");
- if (excessBlocks.size() == 0) {
- excessReplicateMap.remove(node.getStorageID());
- }
- }
-
- // Remove the replica from corruptReplicas
- corruptReplicas.removeFromCorruptReplicasMap(block, node);
- }
/**
* The given node is reporting that it received a certain block.
@@ -3430,39 +2322,20 @@
throw new DisallowedDatanodeException(node);
}
- // decrement number of blocks scheduled to this datanode.
- node.decBlocksScheduled();
-
- // get the deletion hint node
- DatanodeDescriptor delHintNode = null;
- if(delHint!=null && delHint.length()!=0) {
- delHintNode = datanodeMap.get(delHint);
- if(delHintNode == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
- + block
- + " is expected to be removed from an unrecorded node "
- + delHint);
- }
- }
-
- //
- // Modify the blocks->datanode map and node's map.
- //
- pendingReplications.remove(block);
- addStoredBlock(block, node, delHintNode );
+ blockManager.addBlock(node, block, delHint);
}
public long getMissingBlocksCount() {
// not locking
- return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
+ return blockManager.getMissingBlocksCount();
}
long[] getStats() {
synchronized(heartbeats) {
return new long[] {this.capacityTotal, this.capacityUsed,
this.capacityRemaining,
- this.underReplicatedBlocksCount,
- this.corruptReplicaBlocksCount,
+ getUnderReplicatedBlocks(),
+ getCorruptReplicaBlocksCount(),
getMissingBlocksCount()};
}
}
@@ -3682,7 +2555,7 @@
Iterator<Block> decommissionBlocks = node.getBlockIterator();
while(decommissionBlocks.hasNext()) {
Block block = decommissionBlocks.next();
- updateNeededReplications(block, -1, 0);
+ blockManager.updateNeededReplications(block, -1, 0);
}
}
}
@@ -3706,9 +2579,9 @@
return new Date(systemStart);
}
- short getMaxReplication() { return (short)maxReplication; }
- short getMinReplication() { return (short)minReplication; }
- short getDefaultReplication() { return (short)defaultReplication; }
+ short getMaxReplication() { return (short)blockManager.maxReplication; }
+ short getMinReplication() { return (short)blockManager.minReplication; }
+ short getDefaultReplication() { return (short)blockManager.defaultReplication; }
/**
* A immutable object that stores the number of live replicas and
@@ -3716,7 +2589,7 @@
*/
static class NumberReplicas {
private int liveReplicas;
- private int decommissionedReplicas;
+ int decommissionedReplicas;
private int corruptReplicas;
private int excessReplicas;
@@ -3750,79 +2623,6 @@
}
/**
- * Counts the number of nodes in the given list into active and
- * decommissioned counters.
- */
- private NumberReplicas countNodes(Block b,
- Iterator<DatanodeDescriptor> nodeIter) {
- int count = 0;
- int live = 0;
- int corrupt = 0;
- int excess = 0;
- Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
- while ( nodeIter.hasNext() ) {
- DatanodeDescriptor node = nodeIter.next();
- if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
- corrupt++;
- }
- else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- count++;
- }
- else {
- Collection<Block> blocksExcess =
- excessReplicateMap.get(node.getStorageID());
- if (blocksExcess != null && blocksExcess.contains(b)) {
- excess++;
- } else {
- live++;
- }
- }
- }
- return new NumberReplicas(live, count, corrupt, excess);
- }
-
- /**
- * Return the number of nodes that are live and decommissioned.
- */
- NumberReplicas countNodes(Block b) {
- return countNodes(b, blocksMap.nodeIterator(b));
- }
-
- /**
- * Return true if there are any blocks on this node that have not
- * yet reached their replication factor. Otherwise returns false.
- */
- private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
- boolean status = false;
- for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
- final Block block = i.next();
- INode fileINode = blocksMap.getINode(block);
-
- if (fileINode != null) {
- NumberReplicas num = countNodes(block);
- int curReplicas = num.liveReplicas();
- int curExpectedReplicas = getReplication(block);
- if (curExpectedReplicas > curReplicas) {
- status = true;
- if (!neededReplications.contains(block) &&
- pendingReplications.getNumReplicas(block) == 0) {
- //
- // These blocks have been reported from the datanode
- // after the startDecommission method has been executed. These
- // blocks were in flight when the decommission was started.
- //
- neededReplications.add(block,
- curReplicas,
- num.decommissionedReplicas(),
- curExpectedReplicas);
- }
- }
- }
- }
- return status;
- }
-
- /**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
@@ -3832,7 +2632,7 @@
// node has reached their target replication factor.
//
if (node.isDecommissionInProgress()) {
- if (!isReplicationInProgress(node)) {
+ if (!blockManager.isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for node " + node.getName());
}
@@ -4018,8 +2818,8 @@
* During name node startup {@link SafeModeInfo} counts the number of
* <em>safe blocks</em>, those that have at least the minimal number of
* replicas, and calculates the ratio of safe blocks to the total number
- * of blocks in the system, which is the size of
- * {@link FSNamesystem#blocksMap}. When the ratio reaches the
+ * of blocks in the system, which is the size of blocks in
+ * {@link FSNamesystem#blockManager}. When the ratio reaches the
* {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
* to monitor whether the safe mode {@link #extension} is passed.
* Then it leaves safe mode and destroys itself.
@@ -4133,7 +2933,7 @@
}
}
// verify blocks replications
- processMisReplicatedBlocks();
+ blockManager.processMisReplicatedBlocks();
long timeInSafemode = now() - systemStart;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs.");
@@ -4148,7 +2948,7 @@
+clusterMap.getNumOfRacks()+" racks and "
+clusterMap.getNumOfLeaves()+ " datanodes");
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
- +neededReplications.size()+" blocks");
+ +blockManager.neededReplications.size()+" blocks");
}
/**
@@ -4314,11 +3114,7 @@
if (blockTotal == -1 && blockSafe == -1) {
return true; // manual safe mode
}
- int activeBlocks = blocksMap.size();
- for(Iterator<Collection<Block>> it =
- recentInvalidateSets.values().iterator(); it.hasNext();) {
- activeBlocks -= it.next().size();
- }
+ int activeBlocks = blockManager.getActiveBlockCount();
return (blockTotal == activeBlocks) ||
(blockSafe >= 0 && blockSafe <= blockTotal);
}
@@ -4403,7 +3199,7 @@
void decrementSafeBlockCount(Block b) {
if (safeMode == null) // mostly true
return;
- safeMode.decrementSafeBlockCount((short)countNodes(b).liveReplicas());
+ safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
}
/**
@@ -4412,14 +3208,14 @@
void setBlockTotal() {
if (safeMode == null)
return;
- safeMode.setBlockTotal(blocksMap.size());
+ safeMode.setBlockTotal((int)getBlocksTotal());
}
/**
* Get the total number of blocks in the system.
*/
public long getBlocksTotal() {
- return blocksMap.size();
+ return blockManager.getTotalBlocks();
}
/**
@@ -4501,7 +3297,7 @@
* Returns whether the given block is one pointed-to by a file.
*/
private boolean isValidBlock(Block b) {
- return (blocksMap.getINode(b) != null);
+ return (blockManager.getINode(b) != null);
}
// Distributed upgrade manager
@@ -4616,20 +3412,20 @@
}
public long getPendingReplicationBlocks() {
- return pendingReplicationBlocksCount;
+ return blockManager.pendingReplicationBlocksCount;
}
public long getUnderReplicatedBlocks() {
- return underReplicatedBlocksCount;
+ return blockManager.underReplicatedBlocksCount;
}
/** Returns number of blocks with corrupt replicas */
public long getCorruptReplicaBlocksCount() {
- return corruptReplicaBlocksCount;
+ return blockManager.corruptReplicaBlocksCount;
}
public long getScheduledReplicationBlocks() {
- return scheduledReplicationBlocksCount;
+ return blockManager.scheduledReplicationBlocksCount;
}
public String getFSState() {
@@ -4738,7 +3534,7 @@
* Increments, logs and then returns the stamp
*/
synchronized long nextGenerationStampForBlock(Block block) throws IOException {
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ BlockInfo storedBlock = blockManager.getStoredBlock(block);
if (storedBlock == null) {
String msg = block + " is already commited, storedBlock == null.";
LOG.info(msg);
@@ -4849,4 +3645,13 @@
" node namespaceID = " + registration.getNamespaceID());
getEditLog().releaseBackupStream(registration);
}
+
+ public int numCorruptReplicas(Block blk) {
+ return blockManager.numCorruptReplicas(blk);
+ }
+
+ /** Get a datanode descriptor given corresponding storageID */
+ DatanodeDescriptor getDatanode(String nodeID) {
+ return datanodeMap.get(nodeID);
+ }
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Wed May 6 22:32:34 2009
@@ -314,7 +314,7 @@
// Loop until all corrupt replicas are reported
int corruptReplicaSize = cluster.getNamesystem().
- corruptReplicas.numCorruptReplicas(blk);
+ numCorruptReplicas(blk);
while (corruptReplicaSize != numCorruptReplicas) {
try {
IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
@@ -328,7 +328,7 @@
} catch (InterruptedException ignore) {
}
corruptReplicaSize = cluster.getNamesystem().
- corruptReplicas.numCorruptReplicas(blk);
+ numCorruptReplicas(blk);
}
// Loop until the block recovers after replication
@@ -349,7 +349,7 @@
// Make sure the corrupt replica is invalidated and removed from
// corruptReplicasMap
corruptReplicaSize = cluster.getNamesystem().
- corruptReplicas.numCorruptReplicas(blk);
+ numCorruptReplicas(blk);
while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
try {
LOG.info("Looping until corrupt replica is invalidated");
@@ -357,7 +357,7 @@
} catch (InterruptedException ignore) {
}
corruptReplicaSize = cluster.getNamesystem().
- corruptReplicas.numCorruptReplicas(blk);
+ numCorruptReplicas(blk);
blocks = dfsClient.namenode.
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java Wed May 6 22:32:34 2009
@@ -31,23 +31,23 @@
for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0,
GenerationStamp.FIRST_VALID_STAMP);
- namesystem.addToInvalidatesNoLog(block, nodes[i]);
+ namesystem.blockManager.addToInvalidates(block, nodes[i]);
}
}
assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
- namesystem.computeInvalidateWork(NUM_OF_DATANODES+1));
+ namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES+1));
assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
- namesystem.computeInvalidateWork(NUM_OF_DATANODES));
+ namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES));
assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1),
- namesystem.computeInvalidateWork(NUM_OF_DATANODES-1));
- int workCount = namesystem.computeInvalidateWork(1);
+ namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES-1));
+ int workCount = namesystem.blockManager.computeInvalidateWork(1);
if (workCount == 1) {
assertEquals(namesystem.blockInvalidateLimit+1,
- namesystem.computeInvalidateWork(2));
+ namesystem.blockManager.computeInvalidateWork(2));
} else {
assertEquals(workCount, namesystem.blockInvalidateLimit);
- assertEquals(2, namesystem.computeInvalidateWork(2));
+ assertEquals(2, namesystem.blockManager.computeInvalidateWork(2));
}
}
} finally {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java Wed May 6 22:32:34 2009
@@ -62,16 +62,16 @@
NumberReplicas num = null;
do {
synchronized (namesystem) {
- num = namesystem.countNodes(block);
+ num = namesystem.blockManager.countNodes(block);
}
} while (num.excessReplicas() == 0);
// find out a non-excess node
- Iterator<DatanodeDescriptor> iter = namesystem.blocksMap.nodeIterator(block);
+ Iterator<DatanodeDescriptor> iter = namesystem.blockManager.blocksMap.nodeIterator(block);
DatanodeDescriptor nonExcessDN = null;
while (iter.hasNext()) {
DatanodeDescriptor dn = iter.next();
- Collection<Block> blocks = namesystem.excessReplicateMap.get(dn.getStorageID());
+ Collection<Block> blocks = namesystem.blockManager.excessReplicateMap.get(dn.getStorageID());
if (blocks == null || !blocks.contains(block) ) {
nonExcessDN = dn;
break;
@@ -89,7 +89,7 @@
// The block should be replicated
do {
- num = namesystem.countNodes(block);
+ num = namesystem.blockManager.countNodes(block);
} while (num.liveReplicas() != REPLICATION_FACTOR);
// restart the first datanode
@@ -98,7 +98,7 @@
// check if excessive replica is detected
do {
- num = namesystem.countNodes(block);
+ num = namesystem.blockManager.countNodes(block);
} while (num.excessReplicas() == 2);
} finally {
cluster.shutdown();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java Wed May 6 22:32:34 2009
@@ -86,7 +86,7 @@
// corrupt one won't be chosen to be excess one
// without 4910 the number of live replicas would be 0: block gets lost
- assertEquals(1, namesystem.countNodes(block).liveReplicas());
+ assertEquals(1, namesystem.blockManager.countNodes(block).liveReplicas());
}
} finally {
cluster.shutdown();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Wed May 6 22:32:34 2009
@@ -63,7 +63,7 @@
throw (RuntimeException)new RuntimeException().initCause(e);
}
FSNamesystem fsNamesystem = namenode.getNamesystem();
- replicator = fsNamesystem.replicator;
+ replicator = fsNamesystem.blockManager.replicator;
cluster = fsNamesystem.clusterMap;
// construct network topology
for(int i=0; i<NUM_OF_DATANODES; i++) {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java Wed May 6 22:32:34 2009
@@ -27,9 +27,9 @@
// but the block does not get put into the under-replicated blocks queue
final FSNamesystem namesystem = cluster.getNamesystem();
Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
- DatanodeDescriptor dn = namesystem.blocksMap.nodeIterator(b).next();
- namesystem.addToInvalidates(b, dn);
- namesystem.blocksMap.removeNode(b, dn);
+ DatanodeDescriptor dn = namesystem.blockManager.blocksMap.nodeIterator(b).next();
+ namesystem.blockManager.addToInvalidates(b, dn);
+ namesystem.blockManager.blocksMap.removeNode(b, dn);
// increment this file's replication factor
FsShell shell = new FsShell(conf);