You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2009/05/07 00:32:35 UTC

svn commit: r772450 [2/2] - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/namenode/

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed May  6 22:32:34 2009
@@ -40,9 +40,7 @@
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
@@ -50,6 +48,7 @@
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -127,24 +126,13 @@
   private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
-  private long pendingReplicationBlocksCount = 0L, corruptReplicaBlocksCount,
-    underReplicatedBlocksCount = 0L, scheduledReplicationBlocksCount = 0L;
 
   //
   // Stores the correct file name hierarchy
   //
   public FSDirectory dir;
 
-  //
-  // Mapping: Block -> { INode, datanodes, self ref } 
-  // Updated only in response to client-sent information.
-  //
-  BlocksMap blocksMap = new BlocksMap();
-
-  //
-  // Store blocks-->datanodedescriptor(s) map of corrupt replicas
-  //
-  public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+  BlockManager blockManager;
     
   /**
    * Stores the datanode -> block map.  
@@ -170,24 +158,6 @@
   NavigableMap<String, DatanodeDescriptor> datanodeMap = 
     new TreeMap<String, DatanodeDescriptor>();
 
-  //
-  // Keeps a Collection for every named machine containing
-  // blocks that have recently been invalidated and are thought to live
-  // on the machine in question.
-  // Mapping: StorageID -> ArrayList<Block>
-  //
-  private Map<String, Collection<Block>> recentInvalidateSets = 
-    new TreeMap<String, Collection<Block>>();
-
-  //
-  // Keeps a TreeSet for every named node.  Each treeset contains
-  // a list of the blocks that are "extra" at that location.  We'll
-  // eventually remove these extras.
-  // Mapping: StorageID -> TreeSet<Block>
-  //
-  Map<String, Collection<Block>> excessReplicateMap = 
-    new TreeMap<String, Collection<Block>>();
-
   Random r = new Random();
 
   /**
@@ -199,14 +169,6 @@
    */
   ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
 
-  //
-  // Store set of Blocks that need to be replicated 1 or more times.
-  // We also store pending replication-orders.
-  // Set of: Block
-  //
-  private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
-  private PendingReplicationBlocks pendingReplications;
-
   public LeaseManager leaseManager = new LeaseManager(this); 
 
   //
@@ -221,14 +183,6 @@
   private volatile boolean fsRunning = true;
   long systemStart = 0;
 
-  //  The maximum number of replicates we should allow for a single block
-  private int maxReplication;
-  //  How many outgoing replication streams a given node should have at one time
-  private int maxReplicationStreams;
-  // MIN_REPLICATION is how many copies we need in place or else we disallow the write
-  private int minReplication;
-  // Default replication
-  private int defaultReplication;
   // heartbeatRecheckInterval is how often namenode checks for expired datanodes
   private long heartbeatRecheckInterval;
   // heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -241,22 +195,12 @@
   // allow appending to hdfs files
   private boolean supportAppends = true;
 
-  /**
-   * Last block index used for replication work.
-   */
-  private int replIndex = 0;
-  private long missingBlocksInCurIter = 0;
-  private long missingBlocksInPrevIter = 0; 
-
   private SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
     
   // datanode networktoplogy
   NetworkTopology clusterMap = new NetworkTopology();
   private DNSToSwitchMapping dnsToSwitchMapping;
-  
-  // for block replicas placement
-  ReplicationTargetChooser replicator;
 
   private HostsFileReader hostsReader; 
   private Daemon dnthread = null;
@@ -292,8 +236,8 @@
    */
   private void initialize(Configuration conf, FSImage fsImage) throws IOException {
     this.systemStart = now();
+    this.blockManager = new BlockManager(this, conf);
     setConfigurationParameters(conf);
-
     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
     if(fsImage == null) {
       this.dir = new FSDirectory(this, conf);
@@ -308,9 +252,6 @@
       this.dir = new FSDirectory(fsImage, this, conf);
     }
     this.safeMode = new SafeModeInfo(conf);
-    pendingReplications = new PendingReplicationBlocks(
-                            conf.getInt("dfs.replication.pending.timeout.sec", 
-                                        -1) * 1000L);
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                         conf.get("dfs.hosts.exclude",""));
   }
@@ -320,7 +261,7 @@
    */
   void activate(Configuration conf) throws IOException {
     setBlockTotal();
-    pendingReplications.start();
+    blockManager.activate();
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(leaseManager.new Monitor());
     this.replthread = new Daemon(new ReplicationMonitor());
@@ -393,6 +334,7 @@
    * is stored
    */
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
+    this.blockManager = new BlockManager(this, conf);
     setConfigurationParameters(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
   }
@@ -436,30 +378,6 @@
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getUserName(), supergroup, new FsPermission(filePermission));
 
-
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
-                         this,
-                         clusterMap);
-    this.defaultReplication = conf.getInt("dfs.replication", 3);
-    this.maxReplication = conf.getInt("dfs.replication.max", 512);
-    this.minReplication = conf.getInt("dfs.replication.min", 1);
-    if (minReplication <= 0)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.min = " 
-                            + minReplication
-                            + " must be greater than 0");
-    if (maxReplication >= (int)Short.MAX_VALUE)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.max = " 
-                            + maxReplication + " must be less than " + (Short.MAX_VALUE));
-    if (maxReplication < minReplication)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.min = " 
-                            + minReplication
-                            + " must be less than dfs.replication.max = " 
-                            + maxReplication);
-    this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
     long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
     this.heartbeatRecheckInterval = conf.getInt(
         "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
@@ -497,7 +415,7 @@
   public void close() {
     fsRunning = false;
     try {
-      if (pendingReplications != null) pendingReplications.stop();
+      if (blockManager != null) blockManager.close();
       if (hbthread != null) hbthread.interrupt();
       if (replthread != null) replthread.interrupt();
       if (dnthread != null) dnthread.interrupt();
@@ -534,48 +452,8 @@
                          filename);
     PrintWriter out = new PrintWriter(new BufferedWriter(
                                                          new FileWriter(file, true)));
- 
-
-    //
-    // Dump contents of neededReplication
-    //
-    synchronized (neededReplications) {
-      out.println("Metasave: Blocks waiting for replication: " + 
-                  neededReplications.size());
-      for (Block block : neededReplications) {
-        List<DatanodeDescriptor> containingNodes =
-                                          new ArrayList<DatanodeDescriptor>();
-        NumberReplicas numReplicas = new NumberReplicas();
-        // source node returned is not used
-        chooseSourceDatanode(block, containingNodes, numReplicas);
-        int usableReplicas = numReplicas.liveReplicas() + 
-                             numReplicas.decommissionedReplicas(); 
-        // l: == live:, d: == decommissioned c: == corrupt e: == excess
-        out.print(block + " (replicas:" +
-                  " l: " + numReplicas.liveReplicas() + 
-                  " d: " + numReplicas.decommissionedReplicas() + 
-                  " c: " + numReplicas.corruptReplicas() + 
-                  " e: " + numReplicas.excessReplicas() + 
-                  ((usableReplicas > 0)? "" : " MISSING") + ")"); 
-
-        for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-             jt.hasNext();) {
-          DatanodeDescriptor node = jt.next();
-          out.print(" " + node + " : ");
-        }
-        out.println("");
-      }
-    }
 
-    //
-    // Dump blocks from pendingReplication
-    //
-    pendingReplications.metaSave(out);
-
-    //
-    // Dump blocks that are waiting to be deleted
-    //
-    dumpRecentInvalidateSets(out);
+    blockManager.metaSave(out);
 
     //
     // Dump all datanodes
@@ -597,28 +475,6 @@
   private boolean isAccessTimeSupported() {
     return accessTimePrecision > 0;
   }
-    
-  /* get replication factor of a block */
-  private int getReplication(Block block) {
-    INodeFile fileINode = blocksMap.getINode(block);
-    if (fileINode == null) { // block does not belong to any file
-      return 0;
-    }
-    assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
-    return fileINode.getReplication();
-  }
-
-  /* updates a block in under replication queue */
-  synchronized void updateNeededReplications(Block block,
-                        int curReplicasDelta, int expectedReplicasDelta) {
-    NumberReplicas repl = countNodes(block);
-    int curExpectedReplicas = getReplication(block);
-    neededReplications.update(block, 
-                              repl.liveReplicas(), 
-                              repl.decommissionedReplicas(),
-                              curExpectedReplicas,
-                              curReplicasDelta, expectedReplicasDelta);
-  }
 
   /////////////////////////////////////////////////////////
   //
@@ -641,7 +497,7 @@
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
           + "Asking for blocks from an unrecorded node " + datanode.getName());
       throw new IllegalArgumentException(
-          "Unexpected exception.  Got getBlocks message for datanode " + 
+          "Unexpected exception.  Got getBlocks message for datanode " +
           datanode.getName() + ", but there is no info for it");
     }
 
@@ -666,27 +522,17 @@
         totalSize += addBlock(iter.next(), results);
       }
     }
-    
+
     return new BlocksWithLocations(
         results.toArray(new BlockWithLocations[results.size()]));
   }
-  
+
   /**
    * Get all valid locations of the block & add the block to results
    * return the length of the added block; 0 if the block is not added
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
-    ArrayList<String> machineSet =
-      new ArrayList<String>(blocksMap.numNodes(block));
-    for(Iterator<DatanodeDescriptor> it = 
-      blocksMap.nodeIterator(block); it.hasNext();) {
-      String storageID = it.next().getStorageID();
-      // filter invalidate replicas
-      Collection<Block> blocks = recentInvalidateSets.get(storageID); 
-      if(blocks==null || !blocks.contains(block)) {
-        machineSet.add(storageID);
-      }
-    }
+    ArrayList<String> machineSet = blockManager.addBlock(block);
     if(machineSet.size() == 0) {
       return 0;
     } else {
@@ -821,58 +667,9 @@
     if (blocks.length == 0) {
       return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
     }
-    List<LocatedBlock> results;
-    results = new ArrayList<LocatedBlock>(blocks.length);
-
-    int curBlk = 0;
-    long curPos = 0, blkSize = 0;
-    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
-    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
-      blkSize = blocks[curBlk].getNumBytes();
-      assert blkSize > 0 : "Block of size 0";
-      if (curPos + blkSize > offset) {
-        break;
-      }
-      curPos += blkSize;
-    }
-    
-    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return null;
-    
-    long endOff = offset + length;
-    
-    do {
-      // get block locations
-      int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
-      int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); 
-      if (numCorruptNodes != numCorruptReplicas) {
-        LOG.warn("Inconsistent number of corrupt replicas for " + 
-            blocks[curBlk] + "blockMap has " + numCorruptNodes + 
-            " but corrupt replicas map has " + numCorruptReplicas);
-      }
-      boolean blockCorrupt = (numCorruptNodes == numNodes);
-      int numMachineSet = blockCorrupt ? numNodes : 
-                            (numNodes - numCorruptNodes);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
-      if (numMachineSet > 0) {
-        numNodes = 0;
-        for(Iterator<DatanodeDescriptor> it = 
-            blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          DatanodeDescriptor dn = it.next();
-          boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
-          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
-            machineSet[numNodes++] = dn;
-        }
-      }
-      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
-                  blockCorrupt));
-      curPos += blocks[curBlk].getNumBytes();
-      curBlk++;
-    } while (curPos < endOff 
-          && curBlk < blocks.length 
-          && results.size() < nrBlocksToReturn);
     
+    List<LocatedBlock> results = blockManager.getBlockLocations(blocks,
+        offset, length, nrBlocksToReturn);
     return inode.createLocatedBlocks(results);
   }
 
@@ -935,7 +732,7 @@
                                              ) throws IOException {
     if (isInSafeMode())
       throw new SafeModeException("Cannot set replication for " + src, safeMode);
-    verifyReplication(src, replication, null);
+    blockManager.verifyReplication(src, replication, null);
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.WRITE);
     }
@@ -951,14 +748,14 @@
 
     // update needReplication priority queues
     for(int idx = 0; idx < fileBlocks.length; idx++)
-      updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
+      blockManager.updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
       
     if (oldRepl > replication) {  
       // old replication > the new one; need to remove copies
       LOG.info("Reducing replication for file " + src 
                + ". New replication is " + replication);
       for(int idx = 0; idx < fileBlocks.length; idx++)
-        processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
+        blockManager.processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
     } else { // replication factor is increased
       LOG.info("Increasing replication for file " + src 
           + ". New replication is " + replication);
@@ -972,33 +769,6 @@
     }
     return dir.getPreferredBlockSize(filename);
   }
-    
-  /**
-   * Check whether the replication parameter is within the range
-   * determined by system configuration.
-   */
-  private void verifyReplication(String src, 
-                                 short replication, 
-                                 String clientName 
-                                 ) throws IOException {
-    
-    if (replication >= minReplication && replication <= maxReplication) {
-      //common case. avoid building 'text'
-      return;
-    }
-    
-    String text = "file " + src 
-      + ((clientName != null) ? " on client " + clientName : "")
-      + ".\n"
-      + "Requested replication " + replication;
-
-    if (replication > maxReplication)
-      throw new IOException(text + " exceeds maximum " + maxReplication);
-      
-    if (replication < minReplication)
-      throw new IOException( 
-                            text + " is less than the required minimum " + minReplication);
-  }
 
   /**
    * Create a new file entry in the namespace.
@@ -1107,7 +877,7 @@
       }
 
       try {
-        verifyReplication(src, replication, clientMachine);
+        blockManager.verifyReplication(src, replication, clientMachine);
       } catch(IOException e) {
         throw new IOException("failed to create "+e.getMessage());
       }
@@ -1188,7 +958,7 @@
                             " Please refer to dfs.support.append configuration parameter.");
     }
     startFileInternal(src, null, holder, clientMachine, false, true, 
-                      (short)maxReplication, (long)0);
+                      (short)blockManager.maxReplication, (long)0);
     getEditLog().logSync();
 
     //
@@ -1203,15 +973,11 @@
       Block[] blocks = file.getBlocks();
       if (blocks != null && blocks.length > 0) {
         Block last = blocks[blocks.length-1];
-        BlockInfo storedBlock = blocksMap.getStoredBlock(last);
+        BlockInfo storedBlock = blockManager.getStoredBlock(last);
         if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
           long fileLength = file.computeContentSummary().getLength();
-          DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
-          Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
-          for (int i = 0; it != null && it.hasNext(); i++) {
-            targets[i] = it.next();
-          }
-          // remove the replica locations of this block from the blocksMap
+          DatanodeDescriptor[] targets = blockManager.getNodes(last);
+          // remove the replica locations of this block from the node
           for (int i = 0; i < targets.length; i++) {
             targets[i].removeBlock(storedBlock);
           }
@@ -1222,17 +988,14 @@
                                 fileLength-storedBlock.getNumBytes());
 
           // Remove block from replication queue.
-          updateNeededReplications(last, 0, 0);
+          blockManager.updateNeededReplications(last, 0, 0);
 
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
           for (DatanodeDescriptor dd : targets) {
             String datanodeId = dd.getStorageID();
-            Collection<Block> v = recentInvalidateSets.get(datanodeId);
-            if (v != null && v.remove(last) && v.isEmpty()) {
-              recentInvalidateSets.remove(datanodeId);
-            }
+            blockManager.removeFromInvalidates(datanodeId, last);
           }
         }
       }
@@ -1298,15 +1061,13 @@
       replication = (int)pendingFile.getReplication();
     }
 
-    // choose targets for the new block tobe allocated.
-    DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
-                                                           clientNode,
-                                                           null,
-                                                           blockSize);
-    if (targets.length < this.minReplication) {
+    // choose targets for the new block to be allocated.
+    DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
+        replication, clientNode, null, blockSize);
+    if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
-                            minReplication);
+                            blockManager.minReplication);
     }
 
     // Allocate a new block and record it in the INode. 
@@ -1447,14 +1208,7 @@
     Block[] pendingBlocks = file.getBlocks();
     int nrBlocks = pendingBlocks.length;
     for (int i = 0; i < nrBlocks; i++) {
-      // filter out containingNodes that are marked for decommission.
-      NumberReplicas number = countNodes(pendingBlocks[i]);
-      if (number.liveReplicas() < numExpectedReplicas) {
-        neededReplications.add(pendingBlocks[i], 
-                               number.liveReplicas(), 
-                               number.decommissionedReplicas,
-                               numExpectedReplicas);
-      }
+      blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
     }
   }
 
@@ -1490,7 +1244,7 @@
       // check all blocks of the file.
       //
       for (Block block: v.getBlocks()) {
-        if (blocksMap.numNodes(block) < this.minReplication) {
+        if (!blockManager.checkMinReplication(block)) {
           return false;
         }
       }
@@ -1499,78 +1253,13 @@
       // check the penultimate block of this file
       //
       Block b = v.getPenultimateBlock();
-      if (b != null) {
-        if (blocksMap.numNodes(b) < this.minReplication) {
-          return false;
-        }
+      if (b != null && !blockManager.checkMinReplication(b)) {
+        return false;
       }
     }
     return true;
   }
 
-  /**
-   * Remove a datanode from the invalidatesSet
-   * @param n datanode
-   */
-  private void removeFromInvalidates(DatanodeInfo n) {
-    recentInvalidateSets.remove(n.getStorageID());
-  }
-
-  /**
-   * Adds block to list of blocks which will be invalidated on 
-   * specified datanode and log the move
-   * @param b block
-   * @param n datanode
-   */
-  void addToInvalidates(Block b, DatanodeInfo n) {
-    addToInvalidatesNoLog(b, n);
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-        + b.getBlockName() + " is added to invalidSet of " + n.getName());
-  }
-
-  /**
-   * Adds block to list of blocks which will be invalidated on 
-   * specified datanode
-   * @param b block
-   * @param n datanode
-   */
-  void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
-    Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
-    if (invalidateSet == null) {
-      invalidateSet = new HashSet<Block>();
-      recentInvalidateSets.put(n.getStorageID(), invalidateSet);
-    }
-    invalidateSet.add(b);
-  }
-  
-  /**
-   * Adds block to list of blocks which will be invalidated on 
-   * all its datanodes.
-   */
-  private void addToInvalidates(Block b) {
-    for (Iterator<DatanodeDescriptor> it = 
-                                blocksMap.nodeIterator(b); it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      addToInvalidates(b, node);
-    }
-  }
-
-  /**
-   * dumps the contents of recentInvalidateSets
-   */
-  private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
-    int size = recentInvalidateSets.values().size();
-    out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
-    if (size == 0) {
-      return;
-    }
-    for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
-      Collection<Block> blocks = entry.getValue();
-      if (blocks.size() > 0) {
-        out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
-      }
-    }
-  }
 
   /**
    * Mark the block belonging to datanode as corrupt
@@ -1579,75 +1268,9 @@
    */
   public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
     throws IOException {
-    DatanodeDescriptor node = getDatanode(dn);
-    if (node == null) {
-      throw new IOException("Cannot mark block" + blk.getBlockName() +
-                            " as corrupt because datanode " + dn.getName() +
-                            " does not exist. ");
-    }
-    
-    final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
-    if (storedBlockInfo == null) {
-      // Check if the replica is in the blockMap, if not 
-      // ignore the request for now. This could happen when BlockScanner
-      // thread of Datanode reports bad block before Block reports are sent
-      // by the Datanode on startup
-      NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
-                                   "block " + blk + " could not be marked " +
-                                   "as corrupt as it does not exists in " +
-                                   "blocksMap");
-    } else {
-      INodeFile inode = storedBlockInfo.getINode();
-      if (inode == null) {
-        NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
-                                     "block " + blk + " could not be marked " +
-                                     "as corrupt as it does not belong to " +
-                                     "any file");
-        addToInvalidates(storedBlockInfo, node);
-        return;
-      } 
-      // Add this replica to corruptReplicas Map 
-      corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
-      if (countNodes(storedBlockInfo).liveReplicas()>inode.getReplication()) {
-        // the block is over-replicated so invalidate the replicas immediately
-        invalidateBlock(storedBlockInfo, node);
-      } else {
-        // add the block to neededReplication 
-        updateNeededReplications(storedBlockInfo, -1, 0);
-      }
-    }
+    blockManager.markBlockAsCorrupt(blk, dn);
   }
 
-  /**
-   * Invalidates the given block on the given datanode.
-   */
-  public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
-    throws IOException {
-    NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
-                                 + blk + " on " 
-                                 + dn.getName());
-    DatanodeDescriptor node = getDatanode(dn);
-    if (node == null) {
-      throw new IOException("Cannot invalidate block " + blk +
-                            " because datanode " + dn.getName() +
-                            " does not exist.");
-    }
-
-    // Check how many copies we have of the block.  If we have at least one
-    // copy on a live node, then we can delete it. 
-    int count = countNodes(blk).liveReplicas();
-    if (count > 1) {
-      addToInvalidates(blk, dn);
-      removeStoredBlock(blk, node);
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
-                                   + blk + " on " 
-                                   + dn.getName() + " listed for deletion.");
-    } else {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-                                   + blk + " on " 
-                                   + dn.getName() + " is the only copy and was not deleted.");
-    }
-  }
 
   ////////////////////////////////////////////////////////////////
   // Here's how to handle block-copy failure during client write:
@@ -1738,9 +1361,7 @@
   void removePathAndBlocks(String src, List<Block> blocks) {
     leaseManager.removeLeaseWithPrefixPath(src);
     for(Block b : blocks) {
-      blocksMap.removeINode(b);
-      corruptReplicas.removeFromCorruptReplicasMap(b);
-      addToInvalidates(b);
+      blockManager.removeBlock(b);
     }
   }
 
@@ -1883,16 +1504,11 @@
           + " internalReleaseLease: No blocks found, lease removed.");
         return;
       }
-      // setup the Inode.targets for the last block from the blocksMap
+      // setup the Inode.targets for the last block from the blockManager
       //
       Block[] blocks = pendingFile.getBlocks();
       Block last = blocks[blocks.length-1];
-      DatanodeDescriptor[] targets = 
-         new DatanodeDescriptor[blocksMap.numNodes(last)];
-      Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
-      for (int i = 0; it != null && it.hasNext(); i++) {
-        targets[i] = it.next();
-      }
+      DatanodeDescriptor[] targets = blockManager.getNodes(last);
       pendingFile.setTargets(targets);
     }
     // start lease recovery of the last block for this file.
@@ -1926,7 +1542,7 @@
           + ", closeFile=" + closeFile
           + ", deleteBlock=" + deleteblock
           + ")");
-    final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
+    final BlockInfo oldblockinfo = blockManager.getStoredBlock(lastblock);
     if (oldblockinfo == null) {
       throw new IOException("Block (=" + lastblock + ") not found");
     }
@@ -1941,7 +1557,7 @@
 
     // Remove old block from blocks map. This always have to be done
     // because the generation stamp of this block is changing.
-    blocksMap.removeBlock(oldblockinfo);
+    blockManager.removeBlockFromMap(oldblockinfo);
 
     if (deleteblock) {
       pendingFile.removeBlock(lastblock);
@@ -1949,10 +1565,10 @@
     else {
       // update last block, construct newblockinfo and add it to the blocks map
       lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
-      final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);
+      final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
 
       // find the DatanodeDescriptor objects
-      // There should be no locations in the blocksMap till now because the
+      // There should be no locations in the blockManager till now because the
       // file is underConstruction
       DatanodeDescriptor[] descriptors = null;
       if (newtargets.length > 0) {
@@ -1962,7 +1578,7 @@
         }
       }
       if (closeFile) {
-        // the file is getting closed. Insert block locations into blocksMap.
+        // the file is getting closed. Insert block locations into blockManager.
         // Otherwise fsck will report these blocks as MISSING, especially if the
         // blocksReceived from Datanodes take a long time to arrive.
         for (int i = 0; i < descriptors.length; i++) {
@@ -2280,7 +1896,7 @@
         ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
         //check pending replication
         cmd = nodeinfo.getReplicationCommand(
-              maxReplicationStreams - xmitsInProgress);
+              blockManager.maxReplicationStreams - xmitsInProgress);
         if (cmd != null) {
           cmds.add(cmd);
         }
@@ -2351,7 +1967,7 @@
       while (fsRunning) {
         try {
           computeDatanodeWork();
-          processPendingReplications();
+          blockManager.processPendingReplications();
           Thread.sleep(replicationRecheckInterval);
         } catch (InterruptedException ie) {
           LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
@@ -2393,405 +2009,20 @@
           * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
     }
 
-    workFound = computeReplicationWork(blocksToProcess); 
+    workFound = blockManager.computeReplicationWork(blocksToProcess);
     
     // Update FSNamesystemMetrics counters
     synchronized (this) {
-      pendingReplicationBlocksCount = pendingReplications.size();
-      underReplicatedBlocksCount = neededReplications.size();
-      scheduledReplicationBlocksCount = workFound;
-      corruptReplicaBlocksCount = corruptReplicas.size();
+      blockManager.updateState();
+      blockManager.scheduledReplicationBlocksCount = workFound;
     }
     
-    workFound += computeInvalidateWork(nodesToProcess);
+    workFound += blockManager.computeInvalidateWork(nodesToProcess);
     return workFound;
   }
 
-  /**
-   * Schedule blocks for deletion at datanodes
-   * @param nodesToProcess number of datanodes to schedule deletion work
-   * @return total number of block for deletion
-   */
-  int computeInvalidateWork(int nodesToProcess) {
-    int numOfNodes = recentInvalidateSets.size();
-    nodesToProcess = Math.min(numOfNodes, nodesToProcess);
-    
-    // get an array of the keys
-    ArrayList<String> keyArray =
-      new ArrayList<String>(recentInvalidateSets.keySet());
-
-    // randomly pick up <i>nodesToProcess</i> nodes 
-    // and put them at [0, nodesToProcess)
-    int remainingNodes = numOfNodes - nodesToProcess;
-    if (nodesToProcess < remainingNodes) {
-      for(int i=0; i<nodesToProcess; i++) {
-        int keyIndex = r.nextInt(numOfNodes-i)+i;
-        Collections.swap(keyArray, keyIndex, i); // swap to front
-      }
-    } else {
-      for(int i=0; i<remainingNodes; i++) {
-        int keyIndex = r.nextInt(numOfNodes-i);
-        Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
-      }
-    }
-    
-    int blockCnt = 0;
-    for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
-      blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
-    }
-    return blockCnt;
-  }
-
-  /**
-   * Scan blocks in {@link #neededReplications} and assign replication
-   * work to data-nodes they belong to. 
-   * 
-   * The number of process blocks equals either twice the number of live 
-   * data-nodes or the number of under-replicated blocks whichever is less.
-   * 
-   * @return number of blocks scheduled for replication during this iteration.
-   */
-  private int computeReplicationWork(
-                                  int blocksToProcess) throws IOException {
-    // Choose the blocks to be replicated
-    List<List<Block>> blocksToReplicate = 
-      chooseUnderReplicatedBlocks(blocksToProcess);
-
-    // replicate blocks
-    int scheduledReplicationCount = 0;
-    for (int i=0; i<blocksToReplicate.size(); i++) {
-      for(Block block : blocksToReplicate.get(i)) {
-        if (computeReplicationWorkForBlock(block, i)) {
-          scheduledReplicationCount++;
-        }
-      }
-    }
-    return scheduledReplicationCount;
-  }
-  
-  /** Get a list of block lists to be replicated
-   * The index of block lists represents the 
-   * 
-   * @param blocksToProcess
-   * @return Return a list of block lists to be replicated. 
-   *         The block list index represents its replication priority.
-   */
-  synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
-    // initialize data structure for the return value
-    List<List<Block>> blocksToReplicate = 
-      new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
-    for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
-      blocksToReplicate.add(new ArrayList<Block>());
-    }
-    
-    synchronized(neededReplications) {
-      if (neededReplications.size() == 0) {
-        missingBlocksInCurIter = 0;
-        missingBlocksInPrevIter = 0;
-        return blocksToReplicate;
-      }
-      
-      // Go through all blocks that need replications.
-      BlockIterator neededReplicationsIterator = neededReplications.iterator();
-      // skip to the first unprocessed block, which is at replIndex 
-      for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
-        neededReplicationsIterator.next();
-      }
-      // # of blocks to process equals either twice the number of live 
-      // data-nodes or the number of under-replicated blocks whichever is less
-      blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
-
-      for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
-        if( ! neededReplicationsIterator.hasNext()) {
-          // start from the beginning
-          replIndex = 0;
-          missingBlocksInPrevIter = missingBlocksInCurIter;
-          missingBlocksInCurIter = 0;
-          blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
-          if(blkCnt >= blocksToProcess)
-            break;
-          neededReplicationsIterator = neededReplications.iterator();
-          assert neededReplicationsIterator.hasNext() : 
-                                  "neededReplications should not be empty.";
-        }
-
-        Block block = neededReplicationsIterator.next();
-        int priority = neededReplicationsIterator.getPriority();
-        if (priority < 0 || priority >= blocksToReplicate.size()) {
-          LOG.warn("Unexpected replication priority: " + priority + " " + block);
-        } else {
-          blocksToReplicate.get(priority).add(block);
-        }
-      } // end for
-    } // end synchronized
-    return blocksToReplicate;
- }
-  
-  /** Replicate a block
-   * 
-   * @param block block to be replicated
-   * @param priority a hint of its priority in the neededReplication queue
-   * @return if the block gets replicated or not
-   */
-  boolean computeReplicationWorkForBlock(Block block, int priority) {
-    int requiredReplication, numEffectiveReplicas; 
-    List<DatanodeDescriptor> containingNodes;
-    DatanodeDescriptor srcNode;
-    
-    synchronized (this) {
-      synchronized (neededReplications) {
-        // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
-        // abandoned block or block reopened for append
-        if(fileINode == null || fileINode.isUnderConstruction()) { 
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          return false;
-        }
-        requiredReplication = fileINode.getReplication(); 
-
-        // get a source data-node
-        containingNodes = new ArrayList<DatanodeDescriptor>();
-        NumberReplicas numReplicas = new NumberReplicas();
-        srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
-        if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
-            <= 0) {          
-          missingBlocksInCurIter++;
-        }
-        if(srcNode == null) // block can not be replicated from any node
-          return false;
-
-        // do not schedule more if enough replicas is already pending
-        numEffectiveReplicas = numReplicas.liveReplicas() +
-                                pendingReplications.getNumReplicas(block);
-        if(numEffectiveReplicas >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block
-              + " from neededReplications as it has enough replicas.");
-          return false;
-        }
-      }
-    }
-
-    // choose replication targets: NOT HODING THE GLOBAL LOCK
-    DatanodeDescriptor targets[] = replicator.chooseTarget(
-        requiredReplication - numEffectiveReplicas,
-        srcNode, containingNodes, null, block.getNumBytes());
-    if(targets.length == 0)
-      return false;
-
-    synchronized (this) {
-      synchronized (neededReplications) {
-        // Recheck since global lock was released
-        // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
-        // abandoned block or block reopened for append
-        if(fileINode == null || fileINode.isUnderConstruction()) { 
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          return false;
-        }
-        requiredReplication = fileINode.getReplication(); 
-
-        // do not schedule more if enough replicas is already pending
-        NumberReplicas numReplicas = countNodes(block);
-        numEffectiveReplicas = numReplicas.liveReplicas() +
-        pendingReplications.getNumReplicas(block);
-        if(numEffectiveReplicas >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block
-              + " from neededReplications as it has enough replicas.");
-          return false;
-        }
-
-        // Add block to the to be replicated list
-        srcNode.addBlockToBeReplicated(block, targets);
-
-        for (DatanodeDescriptor dn : targets) {
-          dn.incBlocksScheduled();
-        }
-        
-        // Move the block-replication into a "pending" state.
-        // The reason we use 'pending' is so we can retry
-        // replications that fail after an appropriate amount of time.
-        pendingReplications.add(block, targets.length);
-        NameNode.stateChangeLog.debug(
-            "BLOCK* block " + block
-            + " is moved from neededReplications to pendingReplications");
-
-        // remove from neededReplications
-        if(numEffectiveReplicas + targets.length >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-        }
-        if (NameNode.stateChangeLog.isInfoEnabled()) {
-          StringBuffer targetList = new StringBuffer("datanode(s)");
-          for (int k = 0; k < targets.length; k++) {
-            targetList.append(' ');
-            targetList.append(targets[k].getName());
-          }
-          NameNode.stateChangeLog.info(
-                    "BLOCK* ask "
-                    + srcNode.getName() + " to replicate "
-                    + block + " to " + targetList);
-          NameNode.stateChangeLog.debug(
-                    "BLOCK* neededReplications = " + neededReplications.size()
-                    + " pendingReplications = " + pendingReplications.size());
-        }
-      }
-    }
-    
-    return true;
-  }
-
-  /**
-   * Parse the data-nodes the block belongs to and choose one,
-   * which will be the replication source.
-   * 
-   * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
-   * since the former do not have write traffic and hence are less busy.
-   * We do not use already decommissioned nodes as a source.
-   * Otherwise we choose a random node among those that did not reach their 
-   * replication limit.
-   * 
-   * In addition form a list of all nodes containing the block
-   * and calculate its replication numbers.
-   */
-  private DatanodeDescriptor chooseSourceDatanode(
-                                    Block block,
-                                    List<DatanodeDescriptor> containingNodes,
-                                    NumberReplicas numReplicas) {
-    containingNodes.clear();
-    DatanodeDescriptor srcNode = null;
-    int live = 0;
-    int decommissioned = 0;
-    int corrupt = 0;
-    int excess = 0;
-    Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
-    while(it.hasNext()) {
-      DatanodeDescriptor node = it.next();
-      Collection<Block> excessBlocks = 
-        excessReplicateMap.get(node.getStorageID());
-      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
-        corrupt++;
-      else if (node.isDecommissionInProgress() || node.isDecommissioned())
-        decommissioned++;
-      else if (excessBlocks != null && excessBlocks.contains(block)) {
-        excess++;
-      } else {
-        live++;
-      }
-      containingNodes.add(node);
-      // Check if this replica is corrupt
-      // If so, do not select the node as src node
-      if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
-        continue;
-      if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
-        continue; // already reached replication limit
-      // the block must not be scheduled for removal on srcNode
-      if(excessBlocks != null && excessBlocks.contains(block))
-        continue;
-      // never use already decommissioned nodes
-      if(node.isDecommissioned())
-        continue;
-      // we prefer nodes that are in DECOMMISSION_INPROGRESS state
-      if(node.isDecommissionInProgress() || srcNode == null) {
-        srcNode = node;
-        continue;
-      }
-      if(srcNode.isDecommissionInProgress())
-        continue;
-      // switch to a different node randomly
-      // this to prevent from deterministically selecting the same node even
-      // if the node failed to replicate the block on previous iterations
-      if(r.nextBoolean())
-        srcNode = node;
-    }
-    if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned, corrupt, excess);
-    return srcNode;
-  }
-
-  /**
-   * Get blocks to invalidate for <i>nodeId</i> 
-   * in {@link #recentInvalidateSets}.
-   * 
-   * @return number of blocks scheduled for removal during this iteration.
-   */
-  private synchronized int invalidateWorkForOneNode(String nodeId) {
-    // blocks should not be replicated or removed if safe mode is on
-    if (isInSafeMode())
-      return 0;
-    // get blocks to invalidate for the nodeId
-    assert nodeId != null;
-    DatanodeDescriptor dn = datanodeMap.get(nodeId);
-    if (dn == null) {
-      recentInvalidateSets.remove(nodeId);
-      return 0;
-    }
-    
-    Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
-    if (invalidateSet == null)
-      return 0;
-
-    ArrayList<Block> blocksToInvalidate = 
-      new ArrayList<Block>(blockInvalidateLimit);
-
-    // # blocks that can be sent in one message is limited
-    Iterator<Block> it = invalidateSet.iterator();
-    for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
-                                                                blkCount++) {
-      blocksToInvalidate.add(it.next());
-      it.remove();
-    }
-
-    // If we send everything in this message, remove this node entry
-    if(!it.hasNext())
-      recentInvalidateSets.remove(nodeId);
-
-    dn.addBlocksToBeInvalidated(blocksToInvalidate);
-
-    if(NameNode.stateChangeLog.isInfoEnabled()) {
-      StringBuffer blockList = new StringBuffer();
-      for(Block blk : blocksToInvalidate) {
-        blockList.append(' ');
-        blockList.append(blk);
-      }
-      NameNode.stateChangeLog.info("BLOCK* ask "
-          + dn.getName() + " to delete " + blockList);
-    }
-    return blocksToInvalidate.size();
-  }
-
   public void setNodeReplicationLimit(int limit) {
-    this.maxReplicationStreams = limit;
-  }
-
-  /**
-   * If there were any replication requests that timed out, reap them
-   * and put them back into the neededReplication queue
-   */
-  void processPendingReplications() {
-    Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
-    if (timedOutItems != null) {
-      synchronized (this) {
-        for (int i = 0; i < timedOutItems.length; i++) {
-          NumberReplicas num = countNodes(timedOutItems[i]);
-          neededReplications.add(timedOutItems[i], 
-                                 num.liveReplicas(),
-                                 num.decommissionedReplicas(),
-                                 getReplication(timedOutItems[i]));
-        }
-      }
-      /* If we know the target datanodes where the replication timedout,
-       * we could invoke decBlocksScheduled() on it. Its ok for now.
-       */
-    }
+    blockManager.maxReplicationStreams = limit;
   }
 
   /**
@@ -2823,7 +2054,7 @@
     }
 
     for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
-      removeStoredBlock(it.next(), nodeInfo);
+      blockManager.removeStoredBlock(it.next(), nodeInfo);
     }
     unprotectedRemoveDatanode(nodeInfo);
     clusterMap.remove(nodeInfo);
@@ -2831,7 +2062,7 @@
 
   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
     nodeDescr.resetBlocks();
-    removeFromInvalidates(nodeDescr);
+    blockManager.removeFromInvalidates(nodeDescr);
     NameNode.stateChangeLog.debug(
                                   "BLOCK* NameSystem.unprotectedRemoveDatanode: "
                                   + nodeDescr.getName() + " is out of service now.");
@@ -2948,300 +2179,11 @@
       throw new DisallowedDatanodeException(node);
     }
     
-    //
-    // Modify the (block-->datanode) map, according to the difference
-    // between the old and new block report.
-    //
-    Collection<Block> toAdd = new LinkedList<Block>();
-    Collection<Block> toRemove = new LinkedList<Block>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
-        
-    for (Block b : toRemove) {
-      removeStoredBlock(b, node);
-    }
-    for (Block b : toAdd) {
-      addStoredBlock(b, node, null);
-    }
-    for (Block b : toInvalidate) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block " 
-          + b + " on " + node.getName() + " size " + b.getNumBytes()
-          + " does not belong to any file.");
-      addToInvalidates(b, node);
-    }
+    blockManager.processReport(node, newReport);
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
   }
 
   /**
-   * Modify (block-->datanode) map.  Remove block from set of 
-   * needed replications if this takes care of the problem.
-   * @return the block that is stored in blockMap.
-   */
-  synchronized Block addStoredBlock(Block block, 
-                                    DatanodeDescriptor node,
-                                    DatanodeDescriptor delNodeHint) {
-    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
-    if(storedBlock == null || storedBlock.getINode() == null) {
-      // If this block does not belong to anyfile, then we are done.
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                   + "addStoredBlock request received for " 
-                                   + block + " on " + node.getName()
-                                   + " size " + block.getNumBytes()
-                                   + " But it does not belong to any file.");
-      // we could add this block to invalidate set of this datanode. 
-      // it will happen in next block report otherwise.
-      return block;      
-    }
-     
-    // add block to the data-node
-    boolean added = node.addBlock(storedBlock);
-    
-    assert storedBlock != null : "Block must be stored by now";
-
-    if (block != storedBlock) {
-      if (block.getNumBytes() >= 0) {
-        long cursize = storedBlock.getNumBytes();
-        if (cursize == 0) {
-          storedBlock.setNumBytes(block.getNumBytes());
-        } else if (cursize != block.getNumBytes()) {
-          LOG.warn("Inconsistent size for block " + block + 
-                   " reported from " + node.getName() + 
-                   " current size is " + cursize +
-                   " reported size is " + block.getNumBytes());
-          try {
-            if (cursize > block.getNumBytes()) {
-              // new replica is smaller in size than existing block.
-              // Mark the new replica as corrupt.
-              LOG.warn("Mark new replica " + block + " from " + node.getName() + 
-                  "as corrupt because its length is shorter than existing ones");
-              markBlockAsCorrupt(block, node);
-            } else {
-              // new replica is larger in size than existing block.
-              // Mark pre-existing replicas as corrupt.
-              int numNodes = blocksMap.numNodes(block);
-              int count = 0;
-              DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
-              Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-              for (; it != null && it.hasNext(); ) {
-                DatanodeDescriptor dd = it.next();
-                if (!dd.equals(node)) {
-                  nodes[count++] = dd;
-                }
-              }
-              for (int j = 0; j < count; j++) {
-                LOG.warn("Mark existing replica " + block + " from " + node.getName() + 
-                " as corrupt because its length is shorter than the new one");
-                markBlockAsCorrupt(block, nodes[j]);
-              }
-              //
-              // change the size of block in blocksMap
-              //
-              storedBlock = blocksMap.getStoredBlock(block); //extra look up!
-              if (storedBlock == null) {
-                LOG.warn("Block " + block + 
-                   " reported from " + node.getName() + 
-                   " does not exist in blockMap. Surprise! Surprise!");
-              } else {
-                storedBlock.setNumBytes(block.getNumBytes());
-              }
-            }
-          } catch (IOException e) {
-            LOG.warn("Error in deleting bad block " + block + e);
-          }
-        }
-        
-        //Updated space consumed if required.
-        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
-        long diff = (file == null) ? 0 :
-                    (file.getPreferredBlockSize() - storedBlock.getNumBytes());
-        
-        if (diff > 0 && file.isUnderConstruction() &&
-            cursize < storedBlock.getNumBytes()) {
-          try {
-            String path = /* For finding parents */ 
-              leaseManager.findPath((INodeFileUnderConstruction)file);
-            dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
-          } catch (IOException e) {
-            LOG.warn("Unexpected exception while updating disk space : " +
-                     e.getMessage());
-          }
-        }
-      }
-      block = storedBlock;
-    }
-    assert storedBlock == block : "Block must be stored by now";
-        
-    int curReplicaDelta = 0;
-        
-    if (added) {
-      curReplicaDelta = 1;
-      // 
-      // At startup time, because too many new blocks come in
-      // they take up lots of space in the log file. 
-      // So, we log only when namenode is out of safemode.
-      //
-      if (!isInSafeMode()) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                      +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
-      }
-    } else {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
-                                   + "Redundant addStoredBlock request received for " 
-                                   + block + " on " + node.getName()
-                                   + " size " + block.getNumBytes());
-    }
-
-    // filter out containingNodes that are marked for decommission.
-    NumberReplicas num = countNodes(storedBlock);
-    int numLiveReplicas = num.liveReplicas();
-    int numCurrentReplica = numLiveReplicas
-      + pendingReplications.getNumReplicas(block);
-
-    // check whether safe replication is reached for the block
-    incrementSafeBlockCount(numCurrentReplica);
- 
-    //
-    // if file is being actively written to, then do not check 
-    // replication-factor here. It will be checked when the file is closed.
-    //
-    INodeFile fileINode = null;
-    fileINode = storedBlock.getINode();
-    if (fileINode.isUnderConstruction()) {
-      return block;
-    }
-
-    // do not handle mis-replicated blocks during startup
-    if(isInSafeMode())
-      return block;
-
-    // handle underReplication/overReplication
-    short fileReplication = fileINode.getReplication();
-    if (numCurrentReplica >= fileReplication) {
-      neededReplications.remove(block, numCurrentReplica, 
-                                num.decommissionedReplicas, fileReplication);
-    } else {
-      updateNeededReplications(block, curReplicaDelta, 0);
-    }
-    if (numCurrentReplica > fileReplication) {
-      processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
-    }
-    // If the file replication has reached desired value
-    // we can remove any corrupt replicas the block may have
-    int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block); 
-    int numCorruptNodes = num.corruptReplicas();
-    if ( numCorruptNodes != corruptReplicasCount) {
-      LOG.warn("Inconsistent number of corrupt replicas for " + 
-          block + "blockMap has " + numCorruptNodes + 
-          " but corrupt replicas map has " + corruptReplicasCount);
-    }
-    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) 
-      invalidateCorruptReplicas(block);
-    return block;
-  }
-
-  /**
-   * Invalidate corrupt replicas.
-   * <p>
-   * This will remove the replicas from the block's location list,
-   * add them to {@link #recentInvalidateSets} so that they could be further
-   * deleted from the respective data-nodes,
-   * and remove the block from corruptReplicasMap.
-   * <p>
-   * This method should be called when the block has sufficient
-   * number of live replicas.
-   *
-   * @param blk Block whose corrupt replicas need to be invalidated
-   */
-  void invalidateCorruptReplicas(Block blk) {
-    Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
-    boolean gotException = false;
-    if (nodes == null)
-      return;
-    for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
-      DatanodeDescriptor node = it.next();
-      try {
-        invalidateBlock(blk, node);
-      } catch (IOException e) {
-        NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
-                                      "error in deleting bad block " + blk +
-                                      " on " + node + e);
-        gotException = true;
-      }
-    }
-    // Remove the block from corruptReplicasMap
-    if (!gotException)
-      corruptReplicas.removeFromCorruptReplicasMap(blk);
-  }
-
-  /**
-   * For each block in the name-node verify whether it belongs to any file,
-   * over or under replicated. Place it into the respective queue.
-   */
-  private synchronized void processMisReplicatedBlocks() {
-    long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
-    neededReplications.clear();
-    for(BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
-      INodeFile fileINode = block.getINode();
-      if(fileINode == null) {
-        // block does not belong to any file
-        nrInvalid++;
-        addToInvalidates(block);
-        continue;
-      }
-      // calculate current replication
-      short expectedReplication = fileINode.getReplication();
-      NumberReplicas num = countNodes(block);
-      int numCurrentReplica = num.liveReplicas();
-      // add to under-replicated queue if need to be
-      if (neededReplications.add(block, 
-                                 numCurrentReplica,
-                                 num.decommissionedReplicas(),
-                                 expectedReplication)) {
-        nrUnderReplicated++;
-      }
-
-      if (numCurrentReplica > expectedReplication) {
-        // over-replicated block
-        nrOverReplicated++;
-        processOverReplicatedBlock(block, expectedReplication, null, null);
-      }
-    }
-    LOG.info("Total number of blocks = " + blocksMap.size());
-    LOG.info("Number of invalid blocks = " + nrInvalid);
-    LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
-    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
-  }
-
-  /**
-   * Find how many of the containing nodes are "extra", if any.
-   * If there are any extras, call chooseExcessReplicates() to
-   * mark them in the excessReplicateMap.
-   */
-  private void processOverReplicatedBlock(Block block, short replication, 
-      DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
-    if(addedNode == delNodeHint) {
-      delNodeHint = null;
-    }
-    Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
-    Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block); 
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
-      Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
-      if (excessBlocks == null || !excessBlocks.contains(block)) {
-        if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
-          // exclude corrupt replicas
-          if (corruptNodes == null || !corruptNodes.contains(cur)) {
-            nonExcess.add(cur);
-          }
-        }
-      }
-    }
-    chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint);    
-  }
-
-  /**
    * We want "replication" replicates for the block, but we now have too many.  
    * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
    *
@@ -3334,15 +2276,7 @@
       }
 
       nonExcess.remove(cur);
-
-      Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
-      if (excessBlocks == null) {
-        excessBlocks = new TreeSet<Block>();
-        excessReplicateMap.put(cur.getStorageID(), excessBlocks);
-      }
-      excessBlocks.add(b);
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
-                                    +"("+cur.getName()+", "+b+") is added to excessReplicateMap");
+      blockManager.addToExcessReplicate(cur, b);
 
       //
       // The 'excessblocks' tracks blocks until we get confirmation
@@ -3353,54 +2287,12 @@
       // should be deleted.  Items are removed from the invalidate list
       // upon giving instructions to the namenode.
       //
-      addToInvalidatesNoLog(b, cur);
+      blockManager.addToInvalidates(b, cur);
       NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
                 +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
     }
   }
 
-  /**
-   * Modify (block-->datanode) map.  Possibly generate 
-   * replication tasks, if the removed block is still valid.
-   */
-  synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
-    NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                                  +block + " from "+node.getName());
-    if (!blocksMap.removeNode(block, node)) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                                    +block+" has already been removed from node "+node);
-      return;
-    }
-        
-    //
-    // It's possible that the block was removed because of a datanode
-    // failure.  If the block is still valid, check if replication is
-    // necessary.  In that case, put block on a possibly-will-
-    // be-replicated list.
-    //
-    INode fileINode = blocksMap.getINode(block);
-    if (fileINode != null) {
-      decrementSafeBlockCount(block);
-      updateNeededReplications(block, -1, 0);
-    }
-
-    //
-    // We've removed a block from a node, so it's definitely no longer
-    // in "excess" there.
-    //
-    Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID());
-    if (excessBlocks != null) {
-      excessBlocks.remove(block);
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                                    +block+" is removed from excessBlocks");
-      if (excessBlocks.size() == 0) {
-        excessReplicateMap.remove(node.getStorageID());
-      }
-    }
-    
-    // Remove the replica from corruptReplicas
-    corruptReplicas.removeFromCorruptReplicasMap(block, node);
-  }
 
   /**
    * The given node is reporting that it received a certain block.
@@ -3430,39 +2322,20 @@
       throw new DisallowedDatanodeException(node);
     }
 
-    // decrement number of blocks scheduled to this datanode.
-    node.decBlocksScheduled();
-    
-    // get the deletion hint node
-    DatanodeDescriptor delHintNode = null;
-    if(delHint!=null && delHint.length()!=0) {
-      delHintNode = datanodeMap.get(delHint);
-      if(delHintNode == null) {
-        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
-            + block
-            + " is expected to be removed from an unrecorded node " 
-            + delHint);
-      }
-    }
-
-    //
-    // Modify the blocks->datanode map and node's map.
-    // 
-    pendingReplications.remove(block);
-    addStoredBlock(block, node, delHintNode );
+    blockManager.addBlock(node, block, delHint);
   }
 
   public long getMissingBlocksCount() {
     // not locking
-    return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter); 
+    return blockManager.getMissingBlocksCount();
   }
   
   long[] getStats() {
     synchronized(heartbeats) {
       return new long[] {this.capacityTotal, this.capacityUsed, 
                          this.capacityRemaining,
-                         this.underReplicatedBlocksCount,
-                         this.corruptReplicaBlocksCount,
+                         getUnderReplicatedBlocks(),
+                         getCorruptReplicaBlocksCount(),
                          getMissingBlocksCount()};
     }
   }
@@ -3682,7 +2555,7 @@
       Iterator<Block> decommissionBlocks = node.getBlockIterator();
       while(decommissionBlocks.hasNext()) {
         Block block = decommissionBlocks.next();
-        updateNeededReplications(block, -1, 0);
+        blockManager.updateNeededReplications(block, -1, 0);
       }
     }
   }
@@ -3706,9 +2579,9 @@
     return new Date(systemStart); 
   }
     
-  short getMaxReplication()     { return (short)maxReplication; }
-  short getMinReplication()     { return (short)minReplication; }
-  short getDefaultReplication() { return (short)defaultReplication; }
+  short getMaxReplication()     { return (short)blockManager.maxReplication; }
+  short getMinReplication()     { return (short)blockManager.minReplication; }
+  short getDefaultReplication() { return (short)blockManager.defaultReplication; }
     
   /**
    * A immutable object that stores the number of live replicas and
@@ -3716,7 +2589,7 @@
    */
   static class NumberReplicas {
     private int liveReplicas;
-    private int decommissionedReplicas;
+    int decommissionedReplicas;
     private int corruptReplicas;
     private int excessReplicas;
 
@@ -3750,79 +2623,6 @@
   } 
 
   /**
-   * Counts the number of nodes in the given list into active and
-   * decommissioned counters.
-   */
-  private NumberReplicas countNodes(Block b,
-                                    Iterator<DatanodeDescriptor> nodeIter) {
-    int count = 0;
-    int live = 0;
-    int corrupt = 0;
-    int excess = 0;
-    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
-    while ( nodeIter.hasNext() ) {
-      DatanodeDescriptor node = nodeIter.next();
-      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
-        corrupt++;
-      }
-      else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-        count++;
-      }
-      else  {
-        Collection<Block> blocksExcess = 
-          excessReplicateMap.get(node.getStorageID());
-        if (blocksExcess != null && blocksExcess.contains(b)) {
-          excess++;
-        } else {
-          live++;
-        }
-      }
-    }
-    return new NumberReplicas(live, count, corrupt, excess);
-  }
-
-  /**
-   * Return the number of nodes that are live and decommissioned.
-   */
-  NumberReplicas countNodes(Block b) {
-    return countNodes(b, blocksMap.nodeIterator(b));
-  }
-
-  /**
-   * Return true if there are any blocks on this node that have not
-   * yet reached their replication factor. Otherwise returns false.
-   */
-  private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
-    boolean status = false;
-    for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
-      final Block block = i.next();
-      INode fileINode = blocksMap.getINode(block);
-
-      if (fileINode != null) {
-        NumberReplicas num = countNodes(block);
-        int curReplicas = num.liveReplicas();
-        int curExpectedReplicas = getReplication(block);
-        if (curExpectedReplicas > curReplicas) {
-          status = true;
-          if (!neededReplications.contains(block) &&
-            pendingReplications.getNumReplicas(block) == 0) {
-            //
-            // These blocks have been reported from the datanode
-            // after the startDecommission method has been executed. These
-            // blocks were in flight when the decommission was started.
-            //
-            neededReplications.add(block, 
-                                   curReplicas,
-                                   num.decommissionedReplicas(),
-                                   curExpectedReplicas);
-          }
-        }
-      }
-    }
-    return status;
-  }
-
-  /**
    * Change, if appropriate, the admin state of a datanode to 
    * decommission completed. Return true if decommission is complete.
    */
@@ -3832,7 +2632,7 @@
     // node has reached their target replication factor.
     //
     if (node.isDecommissionInProgress()) {
-      if (!isReplicationInProgress(node)) {
+      if (!blockManager.isReplicationInProgress(node)) {
         node.setDecommissioned();
         LOG.info("Decommission complete for node " + node.getName());
       }
@@ -4018,8 +2818,8 @@
    * During name node startup {@link SafeModeInfo} counts the number of
    * <em>safe blocks</em>, those that have at least the minimal number of
    * replicas, and calculates the ratio of safe blocks to the total number
-   * of blocks in the system, which is the size of
-   * {@link FSNamesystem#blocksMap}. When the ratio reaches the
+   * of blocks in the system, which is the size of blocks in
+   * {@link FSNamesystem#blockManager}. When the ratio reaches the
    * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
    * to monitor whether the safe mode {@link #extension} is passed.
    * Then it leaves safe mode and destroys itself.
@@ -4133,7 +2933,7 @@
         }
       }
       // verify blocks replications
-      processMisReplicatedBlocks();
+      blockManager.processMisReplicatedBlocks();
       long timeInSafemode = now() - systemStart;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
                                     + timeInSafemode/1000 + " secs.");
@@ -4148,7 +2948,7 @@
                                    +clusterMap.getNumOfRacks()+" racks and "
                                    +clusterMap.getNumOfLeaves()+ " datanodes");
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
-                                   +neededReplications.size()+" blocks");
+                                   +blockManager.neededReplications.size()+" blocks");
     }
       
     /** 
@@ -4314,11 +3114,7 @@
       if (blockTotal == -1 && blockSafe == -1) {
         return true; // manual safe mode
       }
-      int activeBlocks = blocksMap.size();
-      for(Iterator<Collection<Block>> it = 
-            recentInvalidateSets.values().iterator(); it.hasNext();) {
-        activeBlocks -= it.next().size();
-      }
+      int activeBlocks = blockManager.getActiveBlockCount();
       return (blockTotal == activeBlocks) ||
         (blockSafe >= 0 && blockSafe <= blockTotal);
     }
@@ -4403,7 +3199,7 @@
   void decrementSafeBlockCount(Block b) {
     if (safeMode == null) // mostly true
       return;
-    safeMode.decrementSafeBlockCount((short)countNodes(b).liveReplicas());
+    safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
   }
 
   /**
@@ -4412,14 +3208,14 @@
   void setBlockTotal() {
     if (safeMode == null)
       return;
-    safeMode.setBlockTotal(blocksMap.size());
+    safeMode.setBlockTotal((int)getBlocksTotal());
   }
 
   /**
    * Get the total number of blocks in the system. 
    */
   public long getBlocksTotal() {
-    return blocksMap.size();
+    return blockManager.getTotalBlocks();
   }
 
   /**
@@ -4501,7 +3297,7 @@
    * Returns whether the given block is one pointed-to by a file.
    */
   private boolean isValidBlock(Block b) {
-    return (blocksMap.getINode(b) != null);
+    return (blockManager.getINode(b) != null);
   }
 
   // Distributed upgrade manager
@@ -4616,20 +3412,20 @@
   }
 
   public long getPendingReplicationBlocks() {
-    return pendingReplicationBlocksCount;
+    return blockManager.pendingReplicationBlocksCount;
   }
 
   public long getUnderReplicatedBlocks() {
-    return underReplicatedBlocksCount;
+    return blockManager.underReplicatedBlocksCount;
   }
 
   /** Returns number of blocks with corrupt replicas */
   public long getCorruptReplicaBlocksCount() {
-    return corruptReplicaBlocksCount;
+    return blockManager.corruptReplicaBlocksCount;
   }
 
   public long getScheduledReplicationBlocks() {
-    return scheduledReplicationBlocksCount;
+    return blockManager.scheduledReplicationBlocksCount;
   }
 
   public String getFSState() {
@@ -4738,7 +3534,7 @@
    * Increments, logs and then returns the stamp
    */
   synchronized long nextGenerationStampForBlock(Block block) throws IOException {
-    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    BlockInfo storedBlock = blockManager.getStoredBlock(block);
     if (storedBlock == null) {
       String msg = block + " is already commited, storedBlock == null.";
       LOG.info(msg);
@@ -4849,4 +3645,13 @@
               " node namespaceID = " + registration.getNamespaceID());
     getEditLog().releaseBackupStream(registration);
   }
+
+  public int numCorruptReplicas(Block blk) {
+    return blockManager.numCorruptReplicas(blk);
+  }
+
+  /** Get a datanode descriptor given corresponding storageID */
+  DatanodeDescriptor getDatanode(String nodeID) {
+    return datanodeMap.get(nodeID);
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Wed May  6 22:32:34 2009
@@ -314,7 +314,7 @@
 
     // Loop until all corrupt replicas are reported
     int corruptReplicaSize = cluster.getNamesystem().
-                              corruptReplicas.numCorruptReplicas(blk);
+                              numCorruptReplicas(blk);
     while (corruptReplicaSize != numCorruptReplicas) {
       try {
         IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
@@ -328,7 +328,7 @@
       } catch (InterruptedException ignore) {
       }
       corruptReplicaSize = cluster.getNamesystem().
-                              corruptReplicas.numCorruptReplicas(blk);
+                              numCorruptReplicas(blk);
     }
     
     // Loop until the block recovers after replication
@@ -349,7 +349,7 @@
     // Make sure the corrupt replica is invalidated and removed from
     // corruptReplicasMap
     corruptReplicaSize = cluster.getNamesystem().
-                          corruptReplicas.numCorruptReplicas(blk);
+                          numCorruptReplicas(blk);
     while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
       try {
         LOG.info("Looping until corrupt replica is invalidated");
@@ -357,7 +357,7 @@
       } catch (InterruptedException ignore) {
       }
       corruptReplicaSize = cluster.getNamesystem().
-                            corruptReplicas.numCorruptReplicas(blk);
+                            numCorruptReplicas(blk);
       blocks = dfsClient.namenode.
                  getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
       replicaCount = blocks.get(0).getLocations().length;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java Wed May  6 22:32:34 2009
@@ -31,23 +31,23 @@
         for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
           Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0, 
               GenerationStamp.FIRST_VALID_STAMP);
-          namesystem.addToInvalidatesNoLog(block, nodes[i]);
+          namesystem.blockManager.addToInvalidates(block, nodes[i]);
         }
       }
       
       assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
-          namesystem.computeInvalidateWork(NUM_OF_DATANODES+1));
+          namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES+1));
       assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
-          namesystem.computeInvalidateWork(NUM_OF_DATANODES));
+          namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES));
       assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1), 
-          namesystem.computeInvalidateWork(NUM_OF_DATANODES-1));
-      int workCount = namesystem.computeInvalidateWork(1);
+          namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES-1));
+      int workCount = namesystem.blockManager.computeInvalidateWork(1);
       if (workCount == 1) {
         assertEquals(namesystem.blockInvalidateLimit+1, 
-            namesystem.computeInvalidateWork(2));        
+            namesystem.blockManager.computeInvalidateWork(2));
       } else {
         assertEquals(workCount, namesystem.blockInvalidateLimit);
-        assertEquals(2, namesystem.computeInvalidateWork(2));
+        assertEquals(2, namesystem.blockManager.computeInvalidateWork(2));
       }
       }
     } finally {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java Wed May  6 22:32:34 2009
@@ -62,16 +62,16 @@
       NumberReplicas num = null;
       do {
        synchronized (namesystem) {
-         num = namesystem.countNodes(block);
+         num = namesystem.blockManager.countNodes(block);
        }
       } while (num.excessReplicas() == 0);
       
       // find out a non-excess node
-      Iterator<DatanodeDescriptor> iter = namesystem.blocksMap.nodeIterator(block);
+      Iterator<DatanodeDescriptor> iter = namesystem.blockManager.blocksMap.nodeIterator(block);
       DatanodeDescriptor nonExcessDN = null;
       while (iter.hasNext()) {
         DatanodeDescriptor dn = iter.next();
-        Collection<Block> blocks = namesystem.excessReplicateMap.get(dn.getStorageID());
+        Collection<Block> blocks = namesystem.blockManager.excessReplicateMap.get(dn.getStorageID());
         if (blocks == null || !blocks.contains(block) ) {
           nonExcessDN = dn;
           break;
@@ -89,7 +89,7 @@
       
       // The block should be replicated
       do {
-        num = namesystem.countNodes(block);
+        num = namesystem.blockManager.countNodes(block);
       } while (num.liveReplicas() != REPLICATION_FACTOR);
       
       // restart the first datanode
@@ -98,7 +98,7 @@
       
       // check if excessive replica is detected
       do {
-       num = namesystem.countNodes(block);
+       num = namesystem.blockManager.countNodes(block);
       } while (num.excessReplicas() == 2);
     } finally {
       cluster.shutdown();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java Wed May  6 22:32:34 2009
@@ -86,7 +86,7 @@
 
         // corrupt one won't be chosen to be excess one
         // without 4910 the number of live replicas would be 0: block gets lost
-        assertEquals(1, namesystem.countNodes(block).liveReplicas());
+        assertEquals(1, namesystem.blockManager.countNodes(block).liveReplicas());
       }
     } finally {
       cluster.shutdown();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Wed May  6 22:32:34 2009
@@ -63,7 +63,7 @@
       throw (RuntimeException)new RuntimeException().initCause(e);
     }
     FSNamesystem fsNamesystem = namenode.getNamesystem();
-    replicator = fsNamesystem.replicator;
+    replicator = fsNamesystem.blockManager.replicator;
     cluster = fsNamesystem.clusterMap;
     // construct network topology
     for(int i=0; i<NUM_OF_DATANODES; i++) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java?rev=772450&r1=772449&r2=772450&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java Wed May  6 22:32:34 2009
@@ -27,9 +27,9 @@
       // but the block does not get put into the under-replicated blocks queue
       final FSNamesystem namesystem = cluster.getNamesystem();
       Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
-      DatanodeDescriptor dn = namesystem.blocksMap.nodeIterator(b).next();
-      namesystem.addToInvalidates(b, dn);
-      namesystem.blocksMap.removeNode(b, dn);
+      DatanodeDescriptor dn = namesystem.blockManager.blocksMap.nodeIterator(b).next();
+      namesystem.blockManager.addToInvalidates(b, dn);
+      namesystem.blockManager.blocksMap.removeNode(b, dn);
       
       // increment this file's replication factor
       FsShell shell = new FsShell(conf);