You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/16 21:11:43 UTC
svn commit: r496845 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/Block.java
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
src/java/org/apache/hadoop/dfs/FSNamesystem.java
Author: cutting
Date: Tue Jan 16 12:11:42 2007
New Revision: 496845
URL: http://svn.apache.org/viewvc?view=rev&rev=496845
Log:
HADOOP-803. Reduce memory footprint of HDFS namenode by replacing the TreeSet of block locations with an ArrayList. Contributed by Raghu.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 12:11:42 2007
@@ -17,6 +17,10 @@
4. HADOOP-757. Fix "Bad File Descriptor" exception in HDFS client
when an output file is closed twice. (Raghu Angadi via cutting)
+ 5. HADOOP-803. Reduce memory footprint of HDFS namenode by replacing
+ the TreeSet of block locations with an ArrayList.
+ (Raghu Angadi via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Tue Jan 16 12:11:42 2007
@@ -121,18 +121,11 @@
// Comparable
/////////////////////////////////////
public int compareTo(Object o) {
- Block b = (Block) o;
- if (getBlockId() < b.getBlockId()) {
- return -1;
- } else if (getBlockId() == b.getBlockId()) {
- return 0;
- } else {
- return 1;
- }
+ long diff = getBlockId() - ((Block)o).getBlockId();
+ return ( diff < 0 ) ? -1 : ( ( diff > 0 ) ? 1 : 0 );
}
public boolean equals(Object o) {
- Block b = (Block) o;
- return (this.compareTo(b) == 0);
+ return (this.compareTo(o) == 0);
}
public int hashCode() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue Jan 16 12:11:42 2007
@@ -34,7 +34,7 @@
**************************************************/
class DatanodeDescriptor extends DatanodeInfo {
- private volatile Collection<Block> blocks = new TreeSet<Block>();
+ private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>();
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
protected boolean isAlive = false;
@@ -60,17 +60,12 @@
/**
*/
- void updateBlocks(Block newBlocks[]) {
- blocks.clear();
- for (int i = 0; i < newBlocks.length; i++) {
- blocks.add(newBlocks[i]);
- }
- }
-
- /**
- */
void addBlock(Block b) {
- blocks.add(b);
+ blocks.put(b, b);
+ }
+
+ void removeBlock(Block b) {
+ blocks.remove(b);
}
void resetBlocks() {
@@ -94,10 +89,14 @@
}
Block[] getBlocks() {
- return (Block[]) blocks.toArray(new Block[blocks.size()]);
+ return blocks.keySet().toArray(new Block[blocks.size()]);
}
Iterator<Block> getBlockIterator() {
- return blocks.iterator();
+ return blocks.keySet().iterator();
+ }
+
+ Block getBlock(long blockId) {
+ return blocks.get( new Block(blockId, 0) );
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=496845&r1=496844&r2=496845
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jan 16 12:11:42 2007
@@ -59,8 +59,8 @@
// to client-sent information.
// Mapping: Block -> TreeSet<DatanodeDescriptor>
//
- Map<Block, SortedSet<DatanodeDescriptor>> blocksMap =
- new HashMap<Block, SortedSet<DatanodeDescriptor>>();
+ Map<Block, List<DatanodeDescriptor>> blocksMap =
+ new HashMap<Block, List<DatanodeDescriptor>>();
/**
* Stores the datanode -> block map.
@@ -179,6 +179,8 @@
private int maxReplicationStreams;
// MIN_REPLICATION is how many copies we need in place or else we disallow the write
private int minReplication;
+ // Default replication
+ private int defaultReplication;
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
private long heartbeatRecheckInterval;
// heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -199,6 +201,7 @@
int port,
NameNode nn, Configuration conf) throws IOException {
fsNamesystemObject = this;
+ this.defaultReplication = conf.getInt("dfs.replication", 3);
this.maxReplication = conf.getInt("dfs.replication.max", 512);
this.minReplication = conf.getInt("dfs.replication.min", 1);
if( minReplication <= 0 )
@@ -299,7 +302,7 @@
DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
for (int i = 0; i < blocks.length; i++) {
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
if (containingNodes == null) {
machineSets[i] = new DatanodeDescriptor[0];
} else {
@@ -660,22 +663,16 @@
//
// We have the pending blocks, but they won't have
// length info in them (as they were allocated before
- // data-write took place). So we need to add the correct
- // length info to each
- //
- // REMIND - mjc - this is very inefficient! We should
- // improve this!
+ // data-write took place). Find the block stored in
+ // node descriptor.
//
for (int i = 0; i < nrBlocks; i++) {
Block b = pendingBlocks[i];
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
- DatanodeDescriptor node = containingNodes.first();
- for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
- Block cur = it.next();
- if (b.getBlockId() == cur.getBlockId()) {
- b.setNumBytes(cur.getNumBytes());
- break;
- }
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+ Block storedBlock =
+ containingNodes.get(0).getBlock(b.getBlockId());
+ if ( storedBlock != null ) {
+ pendingBlocks[i] = storedBlock;
}
}
@@ -716,7 +713,7 @@
// Now that the file is real, we need to be sure to replicate
// the blocks.
for (int i = 0; i < nrBlocks; i++) {
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
// filter out containingNodes that are marked for decommission.
int numCurrentReplica = countContainingNodes(containingNodes);
@@ -761,7 +758,7 @@
for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
Block b = it.next();
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
if (containingNodes == null || containingNodes.size() < this.minReplication) {
return false;
}
@@ -806,7 +803,7 @@
for (int i = 0; i < deletedBlocks.length; i++) {
Block b = deletedBlocks[i];
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
if (containingNodes != null) {
for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
DatanodeDescriptor node = it.next();
@@ -935,7 +932,7 @@
} else {
String hosts[][] = new String[(endBlock - startBlock) + 1][];
for (int i = startBlock; i <= endBlock; i++) {
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
Collection<String> v = new ArrayList<String>();
if (containingNodes != null) {
for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
@@ -1494,12 +1491,16 @@
// between the old and new block report.
//
int newPos = 0;
- boolean modified = false;
Iterator<Block> iter = node.getBlockIterator();
Block oldblk = iter.hasNext() ? iter.next() : null;
Block newblk = (newReport != null && newReport.length > 0) ?
newReport[0] : null;
+ // common case is that most of the blocks from the datanode
+ // matches blocks in datanode descriptor.
+ Collection<Block> toRemove = new LinkedList<Block>();
+ Collection<Block> toAdd = new LinkedList<Block>();
+
while (oldblk != null || newblk != null) {
int cmp = (oldblk == null) ? 1 :
@@ -1513,25 +1514,25 @@
? newReport[newPos] : null;
} else if (cmp < 0) {
// The old report has a block the new one does not
+ toRemove.add(oldblk);
removeStoredBlock(oldblk, node);
- modified = true;
oldblk = iter.hasNext() ? iter.next() : null;
} else {
// The new report has a block the old one does not
- addStoredBlock(newblk, node);
- modified = true;
+ toAdd.add(addStoredBlock(newblk, node));
newPos++;
newblk = (newPos < newReport.length)
? newReport[newPos] : null;
}
}
- //
- // Modify node so it has the new blockreport
- //
- if (modified) {
- node.updateBlocks(newReport);
+
+ for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
+ node.removeBlock( i.next() );
}
-
+ for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
+ node.addBlock( i.next() );
+ }
+
//
// We've now completely updated the node's block report profile.
// We now go through all its blocks and find which ones are invalid,
@@ -1560,12 +1561,25 @@
/**
* Modify (block-->datanode) map. Remove block from set of
* needed replications if this takes care of the problem.
+ * @return the block that is stored in blockMap.
*/
- synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+ synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
if (containingNodes == null) {
- containingNodes = new TreeSet<DatanodeDescriptor>();
+ //Create an arraylist with the current replication factor
+ FSDirectory.INode inode = dir.getFileByBlock(block);
+ int replication = (inode != null) ?
+ inode.getReplication() : defaultReplication;
+ containingNodes = new ArrayList<DatanodeDescriptor>(replication);
blocksMap.put(block, containingNodes);
+ } else {
+ Block storedBlock =
+ containingNodes.get(0).getBlock(block.getBlockId());
+ // update stored block's length.
+ if ( block.getNumBytes() > 0 ) {
+ storedBlock.setNumBytes( block.getNumBytes() );
+ }
+ block = storedBlock;
}
if (! containingNodes.contains(node)) {
containingNodes.add(node);
@@ -1587,7 +1601,7 @@
synchronized (neededReplications) {
FSDirectory.INode fileINode = dir.getFileByBlock(block);
if( fileINode == null ) // block does not belong to any file
- return;
+ return block;
// filter out containingNodes that are marked for decommission.
int numCurrentReplica = countContainingNodes(containingNodes);
@@ -1612,6 +1626,7 @@
proccessOverReplicatedBlock( block, fileReplication );
}
+ return block;
}
/**
@@ -1620,7 +1635,7 @@
* mark them in the excessReplicateMap.
*/
private void proccessOverReplicatedBlock( Block block, short replication ) {
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
if( containingNodes == null )
return;
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
@@ -1700,7 +1715,7 @@
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+block.getBlockName() + " from "+node.getName() );
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
if (containingNodes == null || ! containingNodes.contains(node)) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+block.getBlockName()+" has already been removed from node "+node );
@@ -1759,14 +1774,9 @@
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+block.getBlockName()+" is received from " + nodeID.getName() );
//
- // Modify the blocks->datanode map
+ // Modify the blocks->datanode map and node's map.
//
- addStoredBlock(block, node);
-
- //
- // Supplement node's blockreport
- //
- node.addBlock(block);
+ node.addBlock( addStoredBlock(block, node) );
}
/**