You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/07/19 02:26:24 UTC

svn commit: r1148112 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/

Author: szetszwo
Date: Tue Jul 19 00:26:21 2011
New Revision: 1148112

URL: http://svn.apache.org/viewvc?rev=1148112&view=rev
Log:
HDFS-2147. Move cluster network topology to block management and fix some javac warnings.

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Tue Jul 19 00:26:21 2011
@@ -569,6 +569,9 @@ Trunk (unreleased changes)
     HDFS-2157. Improve header comment in o.a.h.hdfs.server.namenode.NameNode.
     (atm via eli)
 
+    HDFS-2147. Move cluster network topology to block management and fix some
+    javac warnings.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Jul 19 00:26:21 2011
@@ -22,6 +22,7 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -31,6 +32,8 @@ import java.util.Random;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.Node;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -55,8 +59,9 @@ import org.apache.hadoop.hdfs.server.nam
  */
 @InterfaceAudience.Private
 public class BlockManager {
-  // Default initial capacity and load factor of map
-  public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;
+  static final Log LOG = LogFactory.getLog(BlockManager.class);
+
+  /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
 
   private final FSNamesystem namesystem;
@@ -104,7 +109,7 @@ public class BlockManager {
   //
   // Store blocks-->datanodedescriptor(s) map of corrupt replicas
   //
-  CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+  private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
   //
   // Keeps a Collection for every named machine containing
@@ -112,7 +117,7 @@ public class BlockManager {
   // on the machine in question.
   // Mapping: StorageID -> ArrayList<Block>
   //
-  Map<String, Collection<Block>> recentInvalidateSets =
+  private final Map<String, Collection<Block>> recentInvalidateSets =
     new TreeMap<String, Collection<Block>>();
 
   //
@@ -128,22 +133,22 @@ public class BlockManager {
   // Store set of Blocks that need to be replicated 1 or more times.
   // We also store pending replication-orders.
   //
-  public UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
-  private PendingReplicationBlocks pendingReplications;
+  public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+  private final PendingReplicationBlocks pendingReplications;
 
   //  The maximum number of replicas allowed for a block
-  public int maxReplication;
+  public final int maxReplication;
   //  How many outgoing replication streams a given node should have at one time
   public int maxReplicationStreams;
   // Minimum copies needed or else write is disallowed
-  public int minReplication;
+  public final int minReplication;
   // Default number of replicas
-  public int defaultReplication;
+  public final int defaultReplication;
   // How many entries are returned by getCorruptInodes()
-  int maxCorruptFilesReturned;
+  final int maxCorruptFilesReturned;
   
   // variable to enable check for enough racks 
-  boolean shouldCheckForEnoughRacks = true;
+  final boolean shouldCheckForEnoughRacks;
 
   /**
    * Last block index used for replication work.
@@ -152,28 +157,18 @@ public class BlockManager {
   Random r = new Random();
 
   // for block replicas placement
-  public BlockPlacementPolicy replicator;
+  public final BlockPlacementPolicy replicator;
 
   public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
-    this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
-  }
-  
-  BlockManager(FSNamesystem fsn, Configuration conf, int capacity)
-      throws IOException {
     namesystem = fsn;
+    datanodeManager = new DatanodeManager(fsn);
+
+    blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
+    replicator = BlockPlacementPolicy.getInstance(
+        conf, namesystem, datanodeManager.getNetworkTopology());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
-    setConfigurationParameters(conf);
-    blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR);
-    datanodeManager = new DatanodeManager(fsn);
-  }
-
-  void setConfigurationParameters(Configuration conf) throws IOException {
-    this.replicator = BlockPlacementPolicy.getInstance(
-                         conf,
-                         namesystem,
-                         namesystem.clusterMap);
 
     this.maxCorruptFilesReturned = conf.getInt(
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
@@ -541,6 +536,22 @@ public class BlockManager {
                             minReplication);
   }
 
+  /** Remove a datanode. */
+  public void removeDatanode(final DatanodeDescriptor node) {
+    final Iterator<? extends Block> it = node.getBlockIterator();
+    while(it.hasNext()) {
+      removeStoredBlock(it.next(), node);
+    }
+
+    node.resetBlocks();
+    removeFromInvalidates(node.getStorageID());
+    datanodeManager.getNetworkTopology().remove(node);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("remove datanode " + node.getName());
+    }
+  }
+  
   void removeFromInvalidates(String storageID, Block block) {
     Collection<Block> v = recentInvalidateSets.get(storageID);
     if (v != null && v.remove(block)) {
@@ -1002,6 +1013,29 @@ public class BlockManager {
   }
 
   /**
+   * Choose target datanodes according to the replication policy.
+   * @throws IOException if the number of targets < minimum replication.
+   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
+   */
+  public DatanodeDescriptor[] chooseTarget(final String src,
+      final int numOfReplicas, final DatanodeDescriptor client,
+      final HashMap<Node, Node> excludedNodes,
+      final long blocksize) throws IOException {
+    // choose targets for the new block to be allocated.
+    final DatanodeDescriptor targets[] = replicator.chooseTarget(
+        src, numOfReplicas, client, excludedNodes, blocksize);
+    if (targets.length < minReplication) {
+      throw new IOException("File " + src + " could only be replicated to " +
+                            targets.length + " nodes, instead of " +
+                            minReplication + ". There are "
+                            + getDatanodeManager().getNetworkTopology().getNumOfLeaves()
+                            + " datanode(s) running but "+excludedNodes.size() +
+                            " node(s) are excluded in this operation.");
+    }
+    return targets;
+  }
+
+  /**
    * Parse the data-nodes the block belongs to and choose one,
    * which will be the replication source.
    *

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Tue Jul 19 00:26:21 2011
@@ -57,7 +57,7 @@ public class BlocksMap {
   
   private GSet<Block, BlockInfo> blocks;
 
-  BlocksMap(int initialCapacity, float loadFactor) {
+  BlocksMap(final float loadFactor) {
     this.capacity = computeCapacity();
     this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
   }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Jul 19 00:26:21 2011
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -25,8 +27,11 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.util.Daemon;
 
 /**
@@ -39,6 +44,10 @@ public class DatanodeManager {
 
   final FSNamesystem namesystem;
 
+  /** Cluster network topology */
+  private final NetworkTopology networktopology = new NetworkTopology();
+
+  /** Host names to datanode descriptors mapping. */
   private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
   
   DatanodeManager(final FSNamesystem namesystem) {
@@ -60,6 +69,24 @@ public class DatanodeManager {
     if (decommissionthread != null) decommissionthread.interrupt();
   }
 
+  /** @return the network topology. */
+  public NetworkTopology getNetworkTopology() {
+    return networktopology;
+  }
+  
+  /** Sort the located blocks by the distance to the target host. */
+  public void sortLocatedBlocks(final String targethost,
+      final List<LocatedBlock> locatedblocks) {
+    //sort the blocks
+    final DatanodeDescriptor client = getDatanodeByHost(targethost);
+    for (LocatedBlock b : locatedblocks) {
+      networktopology.pseudoSortByDistance(client, b.getLocations());
+      
+      // Move decommissioned datanodes to the bottom
+      Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
+    }    
+  }
+  
   /** @return the datanode descriptor for the host. */
   public DatanodeDescriptor getDatanodeByHost(final String host) {
     return host2DatanodeMap.getDatanodeByHost(host);
@@ -74,10 +101,12 @@ public class DatanodeManager {
       host2DatanodeMap.remove(
           namesystem.datanodeMap.put(node.getStorageID(), node));
     }
+
     host2DatanodeMap.add(node);
+    networktopology.add(node);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(getClass().getSimpleName() + ".unprotectedAddDatanode: "
+      LOG.debug(getClass().getSimpleName() + ".addDatanode: "
           + "node " + node.getName() + " is added to datanodeMap.");
     }
   }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jul 19 00:26:21 2011
@@ -138,7 +138,6 @@ import org.apache.hadoop.net.CachedDNSTo
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -240,7 +239,7 @@ public class FSNamesystem implements FSC
   // Stores the correct file name hierarchy
   //
   public FSDirectory dir;
-  public BlockManager blockManager;
+  BlockManager blockManager;
   
   // Block pool ID used by this namenode
   String blockPoolId;
@@ -318,8 +317,6 @@ public class FSNamesystem implements FSC
 
   private volatile SafeModeInfo safeMode;  // safe mode information
     
-  /** datanode network toplogy */
-  public NetworkTopology clusterMap = new NetworkTopology();
   private DNSToSwitchMapping dnsToSwitchMapping;
 
   private HostsFileReader hostsReader; 
@@ -876,15 +873,8 @@ public class FSNamesystem implements FSC
       FileNotFoundException, UnresolvedLinkException, IOException {
     LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
     if (blocks != null) {
-      //sort the blocks
-      final DatanodeDescriptor client = 
-          blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
-      for (LocatedBlock b : blocks.getLocatedBlocks()) {
-        clusterMap.pseudoSortByDistance(client, b.getLocations());
-        
-        // Move decommissioned datanodes to the bottom
-        Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
-      }
+      blockManager.getDatanodeManager().sortLocatedBlocks(
+          clientMachine, blocks.getLocatedBlocks());
     }
     return blocks;
   }
@@ -1774,16 +1764,8 @@ public class FSNamesystem implements FSC
     }
 
     // choose targets for the new block to be allocated.
-    DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
+    final DatanodeDescriptor targets[] = blockManager.chooseTarget(
         src, replication, clientNode, excludedNodes, blockSize);
-    if (targets.length < blockManager.minReplication) {
-      throw new IOException("File " + src + " could only be replicated to " +
-                            targets.length + " nodes, instead of " +
-                            blockManager.minReplication + ". There are "
-                            +clusterMap.getNumOfLeaves()+" datanode(s) running"
-                            +" but "+excludedNodes.size() +
-                            " node(s) are excluded in this operation.");
-    }
 
     // Allocate a new block and record it in the INode. 
     writeLock();
@@ -2881,14 +2863,14 @@ public class FSNamesystem implements FSC
                                       nodeReg.getStorageID());
       }
       // update cluster map
-      clusterMap.remove(nodeS);
+      blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
       nodeS.updateRegInfo(nodeReg);
       nodeS.setHostName(hostName);
       nodeS.setDisallowed(false); // Node is in the include list
       
       // resolve network location
       resolveNetworkLocation(nodeS);
-      clusterMap.add(nodeS);
+      blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
         
       // also treat the registration message as a heartbeat
       synchronized(heartbeats) {
@@ -2919,7 +2901,6 @@ public class FSNamesystem implements FSC
       = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
     resolveNetworkLocation(nodeDescr);
     blockManager.getDatanodeManager().addDatanode(nodeDescr);
-    clusterMap.add(nodeDescr);
     checkDecommissioning(nodeDescr, dnAddress);
     
     // also treat the registration message as a heartbeat
@@ -3336,27 +3317,11 @@ public class FSNamesystem implements FSC
       }
     }
 
-    Iterator<? extends Block> it = nodeInfo.getBlockIterator();
-    while(it.hasNext()) {
-      blockManager.removeStoredBlock(it.next(), nodeInfo);
-    }
-    unprotectedRemoveDatanode(nodeInfo);
-    clusterMap.remove(nodeInfo);
+    blockManager.removeDatanode(nodeInfo);
 
     checkSafeMode();
   }
 
-  void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
-    assert hasWriteLock();
-    nodeDescr.resetBlocks();
-    blockManager.removeFromInvalidates(nodeDescr.getStorageID());
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug(
-          "BLOCK* NameSystem.unprotectedRemoveDatanode: "
-          + nodeDescr.getName() + " is out of service now.");
-    }
-  }
-
   FSImage getFSImage() {
     return dir.fsImage;
   }
@@ -4104,14 +4069,6 @@ public class FSNamesystem implements FSC
     return node;
   }
 
-  /** Choose a random datanode
-   * 
-   * @return a randomly chosen datanode
-   */
-  DatanodeDescriptor getRandomDatanode() {
-    return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT);
-  }
-
   /**
    * SafeModeInfo contains information related to the safe mode.
    * <p>
@@ -4278,9 +4235,10 @@ public class FSNamesystem implements FSC
       }
       reached = -1;
       safeMode = null;
+      final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
       NameNode.stateChangeLog.info("STATE* Network topology has "
-                                   +clusterMap.getNumOfRacks()+" racks and "
-                                   +clusterMap.getNumOfLeaves()+ " datanodes");
+          + nt.getNumOfRacks() + " racks and "
+          + nt.getNumOfLeaves() + " datanodes");
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
                                    +blockManager.neededReplications.size()+" blocks");
     }
@@ -5866,6 +5824,11 @@ public class FSNamesystem implements FSC
     return blockPoolId;
   }
 
+  /** @return the block manager. */
+  public BlockManager getBlockManager() {
+    return blockManager;
+  }
+
   /**
    * Remove an already decommissioned data node who is neither in include nor
    * exclude hosts lists from the the list of live or dead nodes.  This is used

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Jul 19 00:26:21 2011
@@ -32,14 +32,14 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.znerd.xmlenc.XMLOutputter;
@@ -61,7 +61,7 @@ public class FileChecksumServlets {
         (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
       final UserGroupInformation ugi = getUGI(request, conf);
       final NameNode namenode = (NameNode)context.getAttribute("name.node");
-      final DatanodeID datanode = namenode.getNamesystem().getRandomDatanode();
+      final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode);
       try {
         final URI uri = createRedirectUri("/getFileChecksum", ugi, datanode, 
                                           request, namenode); 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Tue Jul 19 00:26:21 2011
@@ -86,7 +86,7 @@ public class FileDataServlet extends Dfs
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
       NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
-      return nn.getNamesystem().getRandomDatanode();
+      return NamenodeJspHelper.getRandomDatanode(nn);
     }
     return JspHelper.bestNode(blks);
   }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Tue Jul 19 00:26:21 2011
@@ -66,7 +66,8 @@ public class FsckServlet extends DfsServ
             namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE); 
           final short minReplication = namesystem.getMinReplication();
 
-          new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
+          new NamenodeFsck(conf, nn,
+              NamenodeJspHelper.getNetworkTopology(nn), pmap, out,
               totalDatanodes, minReplication, remoteAddress).fsck();
           
           return null;

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jul 19 00:26:21 2011
@@ -89,7 +89,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -1424,10 +1423,6 @@ public class NameNode implements Namenod
     return httpAddress;
   }
 
-  NetworkTopology getNetworkTopology() {
-    return this.namesystem.clusterMap;
-  }
-
   /**
    * Verify that configured directories exist, then
    * Interactively confirm that formatting is desired 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jul 19 00:26:21 2011
@@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ServletUtil;
@@ -368,13 +370,25 @@ class NamenodeJspHelper {
     return token == null ? null : token.encodeToUrlString();
   }
 
+  /** @return the network topology. */
+  static NetworkTopology getNetworkTopology(final NameNode namenode) {
+    return namenode.getNamesystem().getBlockManager().getDatanodeManager(
+        ).getNetworkTopology();
+  }
+
+  /** @return a randomly chosen datanode. */
+  static DatanodeDescriptor getRandomDatanode(final NameNode namenode) {
+    return (DatanodeDescriptor)getNetworkTopology(namenode).chooseRandom(
+        NodeBase.ROOT);
+  }
+  
   static void redirectToRandomDataNode(ServletContext context,
       HttpServletRequest request, HttpServletResponse resp) throws IOException,
       InterruptedException {
     final NameNode nn = (NameNode) context.getAttribute("name.node");
     final Configuration conf = (Configuration) context
         .getAttribute(JspHelper.CURRENT_CONF);
-    final DatanodeID datanode = nn.getNamesystem().getRandomDatanode();
+    final DatanodeID datanode = getRandomDatanode(nn);
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
     String tokenString = getDelegationToken(nn, request, conf, ugi);
     // if the user is defined, get a delegation token and stringify it

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Jul 19 00:26:21 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -68,9 +67,9 @@ public class TestReplicationPolicy exten
       e.printStackTrace();
       throw (RuntimeException)new RuntimeException().initCause(e);
     }
-    FSNamesystem fsNamesystem = namenode.getNamesystem();
-    replicator = fsNamesystem.blockManager.replicator;
-    cluster = fsNamesystem.clusterMap;
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    replicator = bm.replicator;
+    cluster = bm.getDatanodeManager().getNetworkTopology();
     // construct network topology
     for(int i=0; i<NUM_OF_DATANODES; i++) {
       cluster.add(dataNodes[i]);

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java?rev=1148112&r1=1148111&r2=1148112&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java Tue Jul 19 00:26:21 2011
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 
 public class TestUnderReplicatedBlocks extends TestCase {
   public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
@@ -44,11 +43,11 @@ public class TestUnderReplicatedBlocks e
       
       // remove one replica from the blocksMap so block becomes under-replicated
       // but the block does not get put into the under-replicated blocks queue
-      final FSNamesystem namesystem = cluster.getNamesystem();
+      final BlockManager bm = cluster.getNamesystem().getBlockManager();
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
-      DatanodeDescriptor dn = namesystem.blockManager.blocksMap.nodeIterator(b.getLocalBlock()).next();
-      namesystem.blockManager.addToInvalidates(b.getLocalBlock(), dn);
-      namesystem.blockManager.blocksMap.removeNode(b.getLocalBlock(), dn);
+      DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next();
+      bm.addToInvalidates(b.getLocalBlock(), dn);
+      bm.blocksMap.removeNode(b.getLocalBlock(), dn);
       
       // increment this file's replication factor
       FsShell shell = new FsShell(conf);