You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2007/04/06 22:29:23 UTC
svn commit: r526271 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/Block.java
src/java/org/apache/hadoop/dfs/FSDirectory.java
src/java/org/apache/hadoop/dfs/FSNamesystem.java
src/java/org/apache/hadoop/net/NetworkTopology.java
Author: tomwhite
Date: Fri Apr 6 13:29:23 2007
New Revision: 526271
URL: http://svn.apache.org/viewvc?view=rev&rev=526271
Log:
HADOOP-988. Change namenode to use a single map of blocks to metadata. Contributed by Raghu Angadi.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Apr 6 13:29:23 2007
@@ -112,6 +112,9 @@
35. HADOOP-1151. Remove spurious printing to stderr in streaming
PipeMapRed. (Koji Noguchi via tomwhite)
+36. HADOOP-988. Change namenode to use a single map of blocks to metadata.
+ (Raghu Angadi via tomwhite)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Fri Apr 6 13:29:23 2007
@@ -131,10 +131,10 @@
}
}
public boolean equals(Object o) {
- return (this.compareTo(o) == 0);
+ return blkid == ((Block)o).blkid;
}
public int hashCode() {
- return 37 * 17 + (int) (getBlockId()^(getBlockId()>>>32));
+ return 37 * 17 + (int) (blkid^(blkid>>>32));
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Apr 6 13:29:23 2007
@@ -308,9 +308,8 @@
}
}
-
+ FSNamesystem namesystem = null;
INode rootDir = new INode("");
- Map activeBlocks = new HashMap();
TreeMap activeLocks = new TreeMap();
FSImage fsImage;
boolean ready = false;
@@ -318,13 +317,15 @@
private MetricsRecord directoryMetrics = null;
/** Access an existing dfs name directory. */
- public FSDirectory() throws IOException {
+ public FSDirectory(FSNamesystem ns) throws IOException {
this.fsImage = new FSImage();
+ namesystem = ns;
initialize();
}
- public FSDirectory(FSImage fsImage) throws IOException {
+ public FSDirectory(FSImage fsImage, FSNamesystem ns) throws IOException {
this.fsImage = fsImage;
+ namesystem = ns;
initialize();
}
@@ -415,7 +416,7 @@
int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
// Add file->block mapping
for (int i = 0; i < nrBlocks; i++)
- activeBlocks.put(newNode.blocks[i], newNode);
+ namesystem.blocksMap.addINode(newNode.blocks[i], newNode);
return true;
} else {
return false;
@@ -586,7 +587,7 @@
targetNode.collectSubtreeBlocks(v);
for (Iterator it = v.iterator(); it.hasNext(); ) {
Block b = (Block) it.next();
- activeBlocks.remove(b);
+ namesystem.blocksMap.removeINode(b);
}
return (Block[]) v.toArray(new Block[v.size()]);
}
@@ -755,27 +756,5 @@
srcs = srcs.substring(0, srcs.length() - 1);
}
return srcs;
- }
-
- /**
- * Returns whether the given block is one pointed-to by a file.
- */
- public boolean isValidBlock(Block b) {
- synchronized (rootDir) {
- if (activeBlocks.containsKey(b)) {
- return true;
- } else {
- return false;
- }
- }
- }
-
- /**
- * Returns whether the given block is one pointed-to by a file.
- */
- public INode getFileByBlock(Block b) {
- synchronized (rootDir) {
- return (INode)activeBlocks.get(b);
- }
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Apr 6 13:29:23 2007
@@ -30,6 +30,7 @@
import java.io.*;
import java.util.*;
+import java.lang.UnsupportedOperationException;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
@@ -60,11 +61,10 @@
//
// Stores the block-->datanode(s) map. Updated only in response
// to client-sent information.
- // Mapping: Block -> TreeSet<DatanodeDescriptor>
+ // Mapping: Block -> { INode, datanodes, self ref }
//
- Map<Block, List<DatanodeDescriptor>> blocksMap =
- new HashMap<Block, List<DatanodeDescriptor>>();
-
+ BlocksMap blocksMap = new BlocksMap();
+
/**
* Stores the datanode -> block map.
* <p>
@@ -245,7 +245,7 @@
this.localMachine = hostname;
this.port = port;
- this.dir = new FSDirectory();
+ this.dir = new FSDirectory( this );
StartupOption startOpt = (StartupOption)conf.get(
"dfs.namenode.startup", StartupOption.REGULAR );
this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
@@ -299,7 +299,7 @@
*/
FSNamesystem(FSImage fsImage) throws IOException {
fsNamesystemObject = this;
- this.dir = new FSDirectory(fsImage);
+ this.dir = new FSDirectory(fsImage, this);
}
/** Return the FSNamesystem object
@@ -366,14 +366,11 @@
for (Iterator<Block> it = neededReplications.iterator();
it.hasNext();) {
Block block = it.next();
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
out.print(block);
- if (containingNodes != null) {
- for (Iterator<DatanodeDescriptor> jt = containingNodes.iterator();
- jt.hasNext(); ) {
- DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : " );
- }
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+ jt.hasNext(); ) {
+ DatanodeDescriptor node = jt.next();
+ out.print(" " + node + " : " );
}
out.println("");
}
@@ -401,7 +398,7 @@
/* get replication factor of a block */
private int getReplication( Block block ) {
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
if( fileINode == null ) { // block does not belong to any file
return 0;
} else {
@@ -485,9 +482,10 @@
/* add a block to a under replication queue */
synchronized boolean add(Block block) {
- int curReplicas = countContainingNodes(blocksMap.get(block));
int expectedReplicas = getReplication(block);
- return add(block, curReplicas, expectedReplicas);
+ return add(block,
+ countContainingNodes( block ),
+ expectedReplicas);
}
/* remove a block from a under replication queue */
@@ -522,7 +520,7 @@
/* remove a block from a under replication queue */
synchronized boolean remove(Block block) {
- int curReplicas = countContainingNodes(blocksMap.get(block));
+ int curReplicas = countContainingNodes( block );
int expectedReplicas = getReplication(block);
return remove(block, curReplicas, expectedReplicas);
}
@@ -530,7 +528,7 @@
/* update the priority level of a block */
synchronized void update(Block block,
int curReplicasDelta, int expectedReplicasDelta) {
- int curReplicas = countContainingNodes(blocksMap.get(block));
+ int curReplicas = countContainingNodes( block );
int curExpectedReplicas = getReplication(block);
int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
@@ -614,20 +612,20 @@
if (blocks != null) {
results = new Object[2];
DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
- DatanodeDescriptor clientNode = getDatanodeByHost(clientMachine);
for (int i = 0; i < blocks.length; i++) {
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
- if (containingNodes == null) {
+ int numNodes = blocksMap.numNodes( blocks[i] );
+ if ( numNodes <= 0 ) {
machineSets[i] = new DatanodeDescriptor[0];
} else {
- machineSets[i] = new DatanodeDescriptor[containingNodes.size()];
- ArrayList<DatanodeDescriptor> containingNodesList =
- new ArrayList<DatanodeDescriptor>(containingNodes.size());
- containingNodesList.addAll(containingNodes);
-
- machineSets[i] = replicator.sortByDistance(
- clientNode, containingNodesList);
+ machineSets[i] = new DatanodeDescriptor[ numNodes ];
+ numNodes = 0;
+ for( Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
+ machineSets[i][ numNodes++ ] = it.next();
+ }
+ clusterMap.sortByDistance( getDatanodeByHost(clientMachine),
+ machineSets[i] );
}
}
@@ -998,9 +996,7 @@
//
for (int i = 0; i < nrBlocks; i++) {
Block b = pendingBlocks[i];
- List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
- Block storedBlock =
- containingNodes.get(0).getBlock(b);
+ Block storedBlock = blocksMap.getStoredBlock( b );
if ( storedBlock != null ) {
pendingBlocks[i] = storedBlock;
}
@@ -1044,10 +1040,8 @@
// the blocks.
int numExpectedReplicas = pendingFile.getReplication();
for (int i = 0; i < nrBlocks; i++) {
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
- // filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes(containingNodes);
-
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
if (numCurrentReplica < numExpectedReplicas) {
neededReplications.add(
pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
@@ -1065,7 +1059,7 @@
Block b = null;
do {
b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
- } while (dir.isValidBlock(b));
+ } while ( isValidBlock(b) );
FileUnderConstruction v = pendingCreates.get(src);
v.getBlocks().add(b);
pendingCreateBlocks.add(b);
@@ -1083,9 +1077,7 @@
FileUnderConstruction v = pendingCreates.get(src);
for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
- Block b = it.next();
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
- if (containingNodes == null || containingNodes.size() < this.minReplication) {
+ if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
return false;
}
}
@@ -1145,24 +1137,20 @@
throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
}
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
-
// Check how many copies we have of the block. If we have at least one
// copy on a live node, then we can delete it.
- if (containingNodes != null ) {
- if ((countContainingNodes(containingNodes) > 1) ||
- ((countContainingNodes(containingNodes) == 1) &&
- (dn.isDecommissionInProgress() || dn.isDecommissioned()))) {
+ int count = countContainingNodes( blk );
+ if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() ||
+ dn.isDecommissioned() ))) {
addToInvalidates(blk, dn);
removeStoredBlock(blk, getDatanode(dn));
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+ blk.getBlockName() + " on "
+ dn.getName() + " listed for deletion.");
- } else {
+ } else {
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+ blk.getBlockName() + " on "
+ dn.getName() + " is the only copy and was not deleted.");
- }
}
}
@@ -1202,15 +1190,14 @@
if (deletedBlocks != null) {
for (int i = 0; i < deletedBlocks.length; i++) {
Block b = deletedBlocks[i];
-
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
- if (containingNodes != null) {
- for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
- + b.getBlockName() + " is added to invalidSet of " + node.getName() );
- }
+
+ for ( Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator( b ); it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+ addToInvalidates(b, node);
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
+ + b.getBlockName() + " is added to invalidSet of "
+ + node.getName() );
}
}
}
@@ -1327,12 +1314,10 @@
} else {
String hosts[][] = new String[(endBlock - startBlock) + 1][];
for (int i = startBlock; i <= endBlock; i++) {
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
Collection<String> v = new ArrayList<String>();
- if (containingNodes != null) {
- for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
+ for ( Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
v.add( it.next().getHostName() );
- }
}
hosts[i-startBlock] = v.toArray(new String[v.size()]);
}
@@ -2169,7 +2154,7 @@
// they are added to recentInvalidateSets and will be sent out
// thorugh succeeding heartbeat responses.
//
- if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+ if (! isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) {
addToInvalidates(b, node);
} else {
@@ -2188,28 +2173,23 @@
* @return the block that is stored in blockMap.
*/
synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
- List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
- if (containingNodes == null) {
- //Create an arraylist with the current replication factor
- FSDirectory.INode inode = dir.getFileByBlock(block);
- int replication = (inode != null) ?
- inode.getReplication() : defaultReplication;
- containingNodes = new ArrayList<DatanodeDescriptor>(replication);
- blocksMap.put(block, containingNodes);
- } else {
- Block storedBlock =
- containingNodes.get(0).getBlock(block);
- // update stored block's length.
- if ( storedBlock != null ) {
- if ( block.getNumBytes() > 0 ) {
- storedBlock.setNumBytes( block.getNumBytes() );
- }
- block = storedBlock;
+
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
+ int replication = (fileINode != null) ? fileINode.getReplication() :
+ defaultReplication;
+ boolean added = blocksMap.addNode( block, node, replication );
+
+ Block storedBlock = blocksMap.getStoredBlock( block ); //extra look up!
+ if ( storedBlock != null && block != storedBlock ) {
+ if ( block.getNumBytes() > 0 ) {
+ storedBlock.setNumBytes( block.getNumBytes() );
}
+ block = storedBlock;
}
+
int curReplicaDelta = 0;
- if (! containingNodes.contains(node)) {
- containingNodes.add(node);
+
+ if ( added ) {
curReplicaDelta = 1;
//
// Hairong: I would prefer to set the level of next logrecord
@@ -2226,12 +2206,11 @@
+ block.getBlockName() + " on " + node.getName());
}
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
if( fileINode == null ) // block does not belong to any file
return block;
// filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes(containingNodes)
+ int numCurrentReplica = countContainingNodes( block )
+ pendingReplications.getNumReplicas(block);
// check whether safe replication is reached for the block
@@ -2255,11 +2234,9 @@
* mark them in the excessReplicateMap.
*/
private void proccessOverReplicatedBlock( Block block, short replication ) {
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
- if( containingNodes == null )
- return;
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
- for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( block );
+ it.hasNext(); ) {
DatanodeDescriptor cur = it.next();
Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
if (excessBlocks == null || ! excessBlocks.contains(block)) {
@@ -2335,27 +2312,20 @@
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+block.getBlockName() + " from "+node.getName() );
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
- if (containingNodes == null || ! containingNodes.contains(node)) {
+ if ( !blocksMap.removeNode( block, node ) ) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+block.getBlockName()+" has already been removed from node "+node );
return;
}
- containingNodes.remove(node);
- // filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes(containingNodes);
-
- decrementSafeBlockCount( numCurrentReplica );
- if( containingNodes.isEmpty() )
- blocksMap.remove(block);
+ decrementSafeBlockCount( block );
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
if( fileINode != null ) {
neededReplications.update(block, -1, 0);
}
@@ -2636,28 +2606,27 @@
* Counts the number of nodes in the given list. Skips over nodes
* that are marked for decommission.
*/
- private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) {
- if( nodelist == null ) return 0;
+ private int countContainingNodes(Iterator<DatanodeDescriptor> nodeIter) {
int count = 0;
- for (Iterator<DatanodeDescriptor> it = nodelist.iterator();
- it.hasNext(); ) {
- DatanodeDescriptor node = it.next();
+ while ( nodeIter.hasNext() ) {
+ DatanodeDescriptor node = nodeIter.next();
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
count++;
}
}
return count;
}
+
+ /** wrapper for countContainingNodes( Iterator ). */
+ private int countContainingNodes( Block b ) {
+ return countContainingNodes( blocksMap.nodeIterator( b ) );
+ }
- /*
- * Filter nodes that are marked for decommison in the given list.
- * Return a list of non-decommissioned nodes
- */
- private List<DatanodeDescriptor> filterDecommissionedNodes(
- Collection<DatanodeDescriptor> nodelist) {
- List<DatanodeDescriptor> nonCommissionedNodeList =
+ /** Reeturns a newly allocated list exluding the decommisioned nodes. */
+ ArrayList<DatanodeDescriptor> containingNodeList( Block b ) {
+ ArrayList<DatanodeDescriptor> nonCommissionedNodeList =
new ArrayList<DatanodeDescriptor>();
- for (Iterator<DatanodeDescriptor> it = nodelist.iterator();
+ for( Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( b );
it.hasNext(); ) {
DatanodeDescriptor node = it.next();
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
@@ -2674,15 +2643,9 @@
Block decommissionBlocks[] = srcNode.getBlocks();
for (int i = 0; i < decommissionBlocks.length; i++) {
Block block = decommissionBlocks[i];
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
- if (fileINode == null) {
- continue;
- }
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
- List<DatanodeDescriptor> nodes =
- filterDecommissionedNodes(containingNodes);
- int numCurrentReplica = nodes.size();
- if (fileINode.getReplication() > numCurrentReplica) {
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
+ if ( fileINode != null &&
+ fileINode.getReplication() > countContainingNodes(block) ) {
return true;
}
}
@@ -2758,22 +2721,20 @@
}
Block block = it.next();
long blockSize = block.getNumBytes();
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
if (fileINode == null) { // block does not belong to any file
it.remove();
} else {
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+ List<DatanodeDescriptor> containingNodes =
+ containingNodeList(block);
Collection<Block> excessBlocks = excessReplicateMap.get(
srcNode.getStorageID() );
// srcNode must contain the block, and the block must
// not be scheduled for removal on that node
- if (containingNodes != null && containingNodes.contains(srcNode)
+ if (containingNodes.contains(srcNode)
&& (excessBlocks == null || ! excessBlocks.contains(block))) {
- // filter out containingNodes that are marked for decommission.
- List<DatanodeDescriptor> nodes =
- filterDecommissionedNodes(containingNodes);
- int numCurrentReplica = nodes.size() +
+ int numCurrentReplica = containingNodes.size() +
pendingReplications.getNumReplicas(block);
if (numCurrentReplica >= fileINode.getReplication()) {
it.remove();
@@ -2782,7 +2743,7 @@
Math.min( fileINode.getReplication() - numCurrentReplica,
needed),
datanodeMap.get(srcNode.getStorageID()),
- nodes, null, blockSize);
+ containingNodes, null, blockSize);
if (targets.length > 0) {
// Build items to return
replicateBlocks.add(block);
@@ -2808,7 +2769,7 @@
DatanodeDescriptor targets[] =
(DatanodeDescriptor[]) replicateTargetSets.get(i);
int numCurrentReplica = numCurrentReplicas.get(i).intValue();
- int numExpectedReplica = dir.getFileByBlock( block).getReplication();
+ int numExpectedReplica = blocksMap.getINode(block).getReplication();
if (numCurrentReplica + targets.length >= numExpectedReplica) {
neededReplications.remove(
block, numCurrentReplica, numExpectedReplica);
@@ -3291,29 +3252,8 @@
}
return nodes.toArray( results );
}
-
- /** Return datanodes that sorted by their distances to <i>reader</i>
- */
- DatanodeDescriptor[] sortByDistance(
- final DatanodeDescriptor reader,
- List<DatanodeDescriptor> nodes ) {
- synchronized(clusterMap) {
- if(reader != null && clusterMap.contains(reader)) {
- java.util.Collections.sort(nodes, new Comparator<DatanodeDescriptor>() {
- public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
- return clusterMap.getDistance(reader, n1)
- -clusterMap.getDistance(reader, n2);
- }
- });
- }
- }
- return (DatanodeDescriptor[])nodes.toArray(
- new DatanodeDescriptor[nodes.size()]);
- }
-
} //end of Replicator
-
// Keeps track of which datanodes are allowed to connect to the namenode.
private boolean inHostsList(DatanodeID node) {
@@ -3582,7 +3522,7 @@
* <em>safe blocks</em>, those that have at least the minimal number of
* replicas, and calculates the ratio of safe blocks to the total number
* of blocks in the system, which is the size of
- * {@link FSDirectory#activeBlocks}. When the ratio reaches the
+ * {@link blocksMap}. When the ratio reaches the
* {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
* to monitor whether the safe mode extension is passed. Then it leaves safe
* mode and destroys itself.
@@ -3654,7 +3594,9 @@
*/
synchronized boolean isOn() {
try {
- isConsistent(); // SHV this is an assert
+ assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
+ + "Total num of blocks, active blocks, or "
+ + "total safe blocks don't match.";
} catch( IOException e ) {
System.err.print( StringUtils.stringifyException( e ));
}
@@ -3801,26 +3743,20 @@
/**
* Checks consistency of the class state.
+ * This is costly and currently called only in assert.
*/
- void isConsistent() throws IOException {
+ boolean isConsistent() throws IOException {
if( blockTotal == -1 && blockSafe == -1 ) {
- return; // manual safe mode
+ return true; // manual safe mode
}
- int activeBlocks = dir.activeBlocks.size();
- if( blockTotal != activeBlocks )
- throw new IOException( "blockTotal " + blockTotal
- + " does not match all blocks count. "
- + "activeBlocks = " + activeBlocks
- + ". safeBlocks = " + blockSafe
- + " safeMode is: "
- + ((safeMode == null) ? "null" : safeMode.toString()) );
- if( blockSafe < 0 || blockSafe > blockTotal )
- throw new IOException( "blockSafe " + blockSafe
- + " is out of range [0," + blockTotal + "]. "
- + "activeBlocks = " + activeBlocks
- + " safeMode is: "
- + ((safeMode == null) ? "null" : safeMode.toString()) );
- }
+ int activeBlocks = blocksMap.map.size();
+ for( Iterator<Collection<Block>> it =
+ recentInvalidateSets.values().iterator(); it.hasNext(); ) {
+ activeBlocks -= it.next().size();
+ }
+ return ( blockTotal == activeBlocks ) ||
+ ( blockSafe >= 0 && blockSafe <= blockTotal );
+ }
}
/**
@@ -3880,10 +3816,10 @@
* Decrement number of blocks that reached minimal replication.
* @param replication current replication
*/
- void decrementSafeBlockCount( int replication ) {
- if( safeMode == null )
+ void decrementSafeBlockCount( Block b ) {
+ if( safeMode == null ) // mostly true
return;
- safeMode.decrementSafeBlockCount( (short)replication );
+ safeMode.decrementSafeBlockCount( (short)countContainingNodes( b ) );
}
/**
@@ -3892,7 +3828,7 @@
void setBlockTotal() {
if( safeMode == null )
return;
- safeMode.setBlockTotal( dir.activeBlocks.size() );
+ safeMode.setBlockTotal( blocksMap.map.size() );
}
/**
@@ -4013,6 +3949,177 @@
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw ie;
}
+ }
+ }
+
+ /**
+ * Returns whether the given block is one pointed-to by a file.
+ */
+ public boolean isValidBlock(Block b) {
+ return blocksMap.getINode( b ) != null;
+ }
+
+ /**
+ * This class maintains the map from a block to its metadata.
+ * block's metadata currently includes INode it belongs to and
+ * the datanodes that store the block.
+ */
+ class BlocksMap {
+
+ /**
+ * Internal class for block metadata
+ */
+ private class BlockInfo {
+ FSDirectory.INode inode;
+
+ /** nodes could contain some null entries at the end, so
+ * nodes.legth >= number of datanodes.
+ * if nodes != null then nodes[0] != null.
+ */
+ DatanodeDescriptor nodes[];
+ Block block; //block that was inserted.
+ }
+
+ private class NodeIterator implements Iterator<DatanodeDescriptor> {
+ NodeIterator( DatanodeDescriptor[] nodes ) {
+ arr = nodes;
+ }
+ DatanodeDescriptor[] arr;
+ int nextIdx = 0;
+
+ public boolean hasNext() {
+ return arr != null && nextIdx < arr.length && arr[nextIdx] != null;
+ }
+
+ public DatanodeDescriptor next() {
+ return arr[nextIdx++];
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException( "Sorry. can't remove." );
+ }
+ }
+
+ Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
+
+ /** add BlockInfo if mapping does not exist */
+ private BlockInfo checkBlockInfo( Block b ) {
+ BlockInfo info = map.get( b );
+ if ( info == null ) {
+ info = new BlockInfo();
+ info.block = b;
+ map.put( b, info );
+ }
+ return info;
+ }
+
+ public FSDirectory.INode getINode( Block b ) {
+ BlockInfo info = map.get( b );
+ return ( info != null ) ? info.inode : null;
+ }
+
+ public void addINode( Block b, FSDirectory.INode iNode ) {
+ BlockInfo info = checkBlockInfo( b );
+ info.inode = iNode;
+ }
+
+ public void removeINode( Block b ) {
+ BlockInfo info = map.get( b );
+ if ( info != null ) {
+ info.inode = null;
+ if ( info.nodes == null ) {
+ map.remove( b );
+ }
+ }
+ }
+
+ /** Returns the block object it it exists in the map */
+ public Block getStoredBlock( Block b ) {
+ BlockInfo info = map.get( b );
+ return ( info != null ) ? info.block : null;
+ }
+
+ /** Returned Iterator does not support */
+ public Iterator<DatanodeDescriptor> nodeIterator( Block b ) {
+ BlockInfo info = map.get( b );
+ return new NodeIterator( ( info != null ) ? info.nodes : null );
+ }
+
+ /** counts number of containing nodes. Better than using iterator. */
+ public int numNodes( Block b ) {
+ int count = 0;
+ BlockInfo info = map.get( b );
+ if ( info != null && info.nodes != null ) {
+ count = info.nodes.length;
+ while ( info.nodes[ count-1 ] == null ) // mostly false
+ count--;
+ }
+ return count;
+ }
+
+ /** returns true if the node does not already exists and is added.
+ * false if the node already exists.*/
+ public boolean addNode( Block b,
+ DatanodeDescriptor node,
+ int replicationHint ) {
+ BlockInfo info = checkBlockInfo( b );
+ if ( info.nodes == null ) {
+ info.nodes = new DatanodeDescriptor[ replicationHint ];
+ }
+
+ DatanodeDescriptor[] arr = info.nodes;
+ for( int i=0; i < arr.length; i++ ) {
+ if ( arr[i] == null ) {
+ arr[i] = node;
+ return true;
+ }
+ if ( arr[i] == node ) {
+ return false;
+ }
+ }
+
+ /* Not enough space left. Create a new array. Should normally
+ * happen only when replication is manually increased by the user. */
+ info.nodes = new DatanodeDescriptor[ arr.length + 1 ];
+ for( int i=0; i < arr.length; i++ ) {
+ info.nodes[i] = arr[i];
+ }
+ info.nodes[ arr.length ] = node;
+ return true;
+ }
+
+ public boolean removeNode( Block b, DatanodeDescriptor node ) {
+ BlockInfo info = map.get( b );
+ if ( info == null || info.nodes == null )
+ return false;
+
+ boolean removed = false;
+ // swap lastNode and node's location. set lastNode to null.
+ DatanodeDescriptor[] arr = info.nodes;
+ int lastNode = -1;
+ for( int i=arr.length-1; i >= 0; i-- ) {
+ if ( lastNode < 0 && arr[i] != null )
+ lastNode = i;
+ if ( arr[i] == node ) {
+ arr[i] = arr[ lastNode ];
+ arr[ lastNode ] = null;
+ removed = true;
+ break;
+ }
+ }
+
+ /*
+ * if ( (lastNode + 1) < arr.length/4 ) {
+ * we could trim the array.
+ * }
+ */
+ if ( arr[0] == null ) { // no datanodes left.
+ info.nodes = null;
+ if ( info.inode == null ) {
+ map.remove( b );
+ }
+ }
+ return removed;
}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=526271&r1=526270&r2=526271
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Fri Apr 6 13:29:23 2007
@@ -19,8 +19,10 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Random;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -531,5 +533,26 @@
tree.append( "\n");
}
return tree.toString();
+ }
+
+ /* Set and used only inside sortByDistance.
+ * This saves an allocation each time we sort.
+ */
+ private DatanodeDescriptor distFrom = null;
+ private final Comparator<DatanodeDescriptor> nodeDistanceComparator =
+ new Comparator<DatanodeDescriptor>() {
+ public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
+ return getDistance(distFrom, n1) - getDistance(distFrom, n2);
+ }
+ };
+
+ /** Sorts nodes array by their distances to <i>reader</i>. */
+ public synchronized void sortByDistance( final DatanodeDescriptor reader,
+ DatanodeDescriptor[] nodes ) {
+ if(reader != null && contains(reader)) {
+ distFrom = reader;
+ Arrays.sort( nodes, nodeDistanceComparator );
+ distFrom = null;
+ }
}
}