You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2008/03/19 01:17:19 UTC

svn commit: r638655 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: shv
Date: Tue Mar 18 17:17:17 2008
New Revision: 638655

URL: http://svn.apache.org/viewvc?rev=638655&view=rev
Log:
HADOOP-2606. ReplicationMonitor selects data-nodes to replicate directly from needed replication blocks instead of looking up for the blocks for each live data-node. Contributed by Konstantin Shvachko.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 18 17:17:17 2008
@@ -123,6 +123,10 @@
     HADOOP-2423.  Code optimization in FSNamesystem.mkdirs.
     (Tsz Wo (Nicholas), SZE via dhruba)
 
+		HADOOP-2606. ReplicationMonitor selects data-nodes to replicate directly
+		from needed replication blocks instead of looking up for the blocks for 
+		each live data-node. (shv)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 18 17:17:17 2008
@@ -97,7 +97,7 @@
   volatile boolean shouldRun = true;
   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
   private LinkedList<String> delHints = new LinkedList<String>();
-  final private static String EMPTY_DEL_HINT = "";
+  final static String EMPTY_DEL_HINT = "";
   int xmitsInProgress = 0;
   Daemon dataXceiveServer = null;
   ThreadGroup threadGroup = null;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue Mar 18 17:17:17 2008
@@ -214,30 +214,25 @@
     return new BlockIterator(this.blockList, this);
   }
   
-  /*
+  /**
    * Store block replication work.
    */
-  void addBlocksToBeReplicated(Block[] blocklist, 
-                               DatanodeDescriptor[][] targets) {
-    assert(blocklist != null && targets != null);
-    assert(blocklist.length > 0 && targets.length > 0);
+  void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
+    assert(block != null && targets != null && targets.length > 0);
     synchronized (replicateBlocks) {
-      assert(blocklist.length == targets.length);
-      for (int i = 0; i < blocklist.length; i++) {
-        replicateBlocks.add(blocklist[i]);
-        replicateTargetSets.add(targets[i]);
-      }
+      replicateBlocks.add(block);
+      replicateTargetSets.add(targets);
     }
   }
 
-  /*
+  /**
    * Store block invalidation work.
    */
-  void addBlocksToBeInvalidated(Block[] blocklist) {
-    assert(blocklist != null && blocklist.length > 0);
+  void addBlocksToBeInvalidated(List<Block> blocklist) {
+    assert(blocklist != null && blocklist.size() > 0);
     synchronized (invalidateBlocks) {
-      for (int i = 0; i < blocklist.length; i++) {
-        invalidateBlocks.add(blocklist[i]);
+      for(Block blk : blocklist) {
+        invalidateBlocks.add(blk);
       }
     }
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Mar 18 17:17:17 2008
@@ -203,8 +203,11 @@
   private long decommissionRecheckInterval;
   // default block size of a file
   private long defaultBlockSize = 0;
-  private int replIndex = 0; // last datanode used for replication work
-  static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
+
+  /**
+   * Last block index used for replication work.
+   */
+  private int replIndex = 0;
 
   public static FSNamesystem fsNamesystemObject;
   private String localMachine;
@@ -392,10 +395,10 @@
         "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
     this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
       10 * heartbeatInterval;
-    this.replicationRecheckInterval = 3 * 1000; //  3 second
-    this.replicationRecheckInterval = conf.getInt("dfs.replication.interval", 3) * 1000;
-    this.decommissionRecheckInterval = conf.getInt("dfs.namenode.decommission.interval",
-                                                   5 * 60) * 1000;
+    this.replicationRecheckInterval = 
+      conf.getInt("dfs.replication.interval", 3) * 1000L;
+    this.decommissionRecheckInterval = 
+      conf.getInt("dfs.namenode.decommission.interval", 5 * 60) * 1000L;
     this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
     this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 
@@ -474,18 +477,14 @@
     synchronized (neededReplications) {
       out.println("Metasave: Blocks waiting for replication: " + 
                   neededReplications.size());
-      if (neededReplications.size() > 0) {
-        for (Iterator<Block> it = neededReplications.iterator(); 
-             it.hasNext();) {
-          Block block = it.next();
-          out.print(block);
-          for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-               jt.hasNext();) {
-            DatanodeDescriptor node = jt.next();
-            out.print(" " + node + " : ");
-          }
-          out.println("");
+      for (Block block : neededReplications) {
+        out.print(block);
+        for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+             jt.hasNext();) {
+          DatanodeDescriptor node = jt.next();
+          out.print(" " + node + " : ");
         }
+        out.println("");
       }
     }
 
@@ -2212,6 +2211,8 @@
    * Periodically calls computeReplicationWork().
    */
   class ReplicationMonitor implements Runnable {
+    static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+    static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
     public void run() {
       while (fsRunning) {
         try {
@@ -2219,6 +2220,8 @@
           processPendingReplications();
           Thread.sleep(replicationRecheckInterval);
         } catch (InterruptedException ie) {
+          LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
+          break;
         } catch (IOException ie) {
           LOG.warn("ReplicationMonitor thread received exception. " + ie);
         } catch (Throwable t) {
@@ -2229,81 +2232,276 @@
     }
   }
 
+  /////////////////////////////////////////////////////////
+  //
+  // These methods are called by the Namenode system, to see
+  // if there is any work for registered datanodes.
+  //
+  /////////////////////////////////////////////////////////
   /**
-   * Look at a few datanodes and compute any replication work that 
-   * can be scheduled on them. The datanode will be infomed of this
-   * work at the next heartbeat.
-   */
-  void computeDatanodeWork() throws IOException {
-    int numiter = 0;
-    int foundwork = 0;
-    int hsize = 0;
-    int lastReplIndex = -1;
+   * Compute block replication and block invalidation work 
+   * that can be scheduled on data-nodes.
+   * The datanode will be informed of this work at the next heartbeat.
+   * 
+   * @return number of blocks scheduled for replication or removal.
+   */
+  int computeDatanodeWork() throws IOException {
+    int workFound = 0;
+    int blocksToProcess = 0;
+    int nodesToProcess = 0;
+    synchronized(heartbeats) {
+      blocksToProcess = (int)(heartbeats.size() 
+          * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
+      nodesToProcess = (int)Math.ceil((double)heartbeats.size() 
+          * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
+    }
+
+    workFound = computeReplicationWork(blocksToProcess); 
+    if(workFound == 0)
+      workFound = computeInvalidateWork(nodesToProcess);
+    return workFound;
+  }
+
+  private int computeInvalidateWork(int nodesToProcess) {
+    int blockCnt = 0;
+    for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
+      int work = invalidateWorkForOneNode();
+      if(work == 0)
+        break;
+      blockCnt += work;
+    }
+    return blockCnt;
+  }
 
-    while (true) {
-      DatanodeDescriptor node = null;
+  /**
+   * Scan blocks in {@link #neededReplications} and assign replication
+   * work to data-nodes they belong to. 
+   * 
+   * The number of process blocks equals either twice the number of live 
+   * data-nodes or the number of under-replicated blocks whichever is less.
+   * 
+   * @return number of blocks scheduled for replication during this iteration.
+   */
+  private synchronized int computeReplicationWork(
+                                  int blocksToProcess) throws IOException {
+    int scheduledReplicationCount = 0;
+    // blocks should not be replicated or removed if safe mode is on
+    if (isInSafeMode())
+      return scheduledReplicationCount;
 
-      //
-      // pick the datanode that was the last one in the
-      // previous invocation of this method.
-      //
-      synchronized (heartbeats) {
-        hsize = heartbeats.size();
-        if (numiter++ >= hsize) {
-          // no change in replIndex.
-          if (lastReplIndex >= 0) {
-            //next time, start after where the last replication was scheduled
-            replIndex = lastReplIndex;
-          }
-          break;
-        }
-        if (replIndex >= hsize) {
+    synchronized(neededReplications) {
+      // # of blocks to process equals either twice the number of live 
+      // data-nodes or the number of under-replicated blocks whichever is less
+      blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
+      if(blocksToProcess == 0)
+        return scheduledReplicationCount;
+
+      // Go through all blocks that need replications.
+      // Select source and target nodes for replication.
+      Iterator<Block> neededReplicationsIterator = neededReplications.iterator();
+      // skip to the first unprocessed block, which is at replIndex 
+      for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
+        neededReplicationsIterator.next();
+      }
+      // process blocks
+      for(int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
+        if( ! neededReplicationsIterator.hasNext()) {
+          // start from the beginning
           replIndex = 0;
-        }
-        node = heartbeats.get(replIndex);
-        replIndex++;
-      }
+          blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
+          if(blkCnt >= blocksToProcess)
+            break;
+          neededReplicationsIterator = neededReplications.iterator();
+          assert neededReplicationsIterator.hasNext() : 
+                                  "neededReplications should not be empty.";
+        }
+
+        Block block = neededReplicationsIterator.next();
+
+        // block should belong to a file
+        INodeFile fileINode = blocksMap.getINode(block);
+        if(fileINode == null) { // abandoned block 
+          neededReplicationsIterator.remove(); // remove from neededReplications
+          replIndex--;
+          continue;
+        }
+        int requiredReplication = fileINode.getReplication(); 
+
+        // get a source data-node
+        List<DatanodeDescriptor> containingNodes =
+                                          new ArrayList<DatanodeDescriptor>();
+        NumberReplicas numReplicas = new NumberReplicas();
+        DatanodeDescriptor srcNode = 
+          chooseSourceDatanode(block, containingNodes, numReplicas);
+        if(srcNode == null) // block can not be replicated from any node
+          continue;
+
+        // do not schedule more if enough replicas is already pending
+        int numEffectiveReplicas = numReplicas.liveReplicas() +
+                                pendingReplications.getNumReplicas(block);
+        if(numEffectiveReplicas >= requiredReplication) {
+          neededReplicationsIterator.remove(); // remove from neededReplications
+          replIndex--;
+          NameNode.stateChangeLog.info("BLOCK* "
+              + "Removing block " + block.getBlockName()
+              + " from neededReplications as it does not belong to any file.");
+          continue;
+        }
+
+        // choose replication targets
+        int maxTargets = 
+          maxReplicationStreams - srcNode.getNumberOfBlocksToBeReplicated();
+        assert maxTargets > 0 : "Datanode " + srcNode.getName() 
+              + " should have not been selected as a source for replication.";
+        DatanodeDescriptor targets[] = replicator.chooseTarget(
+            Math.min(requiredReplication - numEffectiveReplicas, maxTargets),
+            srcNode, containingNodes, null, block.getNumBytes());
+        if(targets.length == 0)
+          continue;
+        // Add block to the to be replicated list
+        srcNode.addBlockToBeReplicated(block, targets);
+        scheduledReplicationCount++;
 
-      //
-      // Is there replication work to be computed for this datanode?
-      //
-      int precomputed = node.getNumberOfBlocksToBeReplicated();
-      int needed = this.maxReplicationStreams - precomputed;
-      boolean doReplication = false;
-      boolean doInvalidation = false;
-      if (needed > 0) {
-        //
-        // Compute replication work and store work into the datanode
-        //
-        Object replsets[] = pendingTransfers(node, needed);
-        if (replsets != null) {
-          doReplication = true;
-          addBlocksToBeReplicated(node, (Block[])replsets[0], 
-                                  (DatanodeDescriptor[][])replsets[1]);
-          lastReplIndex = replIndex;
+        // Move the block-replication into a "pending" state.
+        // The reason we use 'pending' is so we can retry
+        // replications that fail after an appropriate amount of time.
+        if(numEffectiveReplicas + targets.length >= requiredReplication) {
+          neededReplicationsIterator.remove(); // remove from neededReplications
+          replIndex--;
+          pendingReplications.add(block, targets.length);
+          NameNode.stateChangeLog.debug(
+              "BLOCK* block " + block.getBlockName()
+              + " is moved from neededReplications to pendingReplications");
+        }
+        if (NameNode.stateChangeLog.isInfoEnabled()) {
+          StringBuffer targetList = new StringBuffer("datanode(s)");
+          for (int k = 0; k < targets.length; k++) {
+            targetList.append(' ');
+            targetList.append(targets[k].getName());
+          }
+          NameNode.stateChangeLog.info(
+                    "BLOCK* ask "
+                    + srcNode.getName() + " to replicate "
+                    + block.getBlockName() + " to " + targetList);
+          NameNode.stateChangeLog.debug(
+                    "BLOCK* neededReplications = " + neededReplications.size()
+                    + " pendingReplications = " + pendingReplications.size());
         }
       }
-      if (!doReplication) {
-        //
-        // Determine if block deletion is pending for this datanode
-        //
-        Block blocklist[] = blocksToInvalidate(node);
-        if (blocklist != null) {
-          doInvalidation = true;
-          addBlocksToBeInvalidated(node, blocklist);
-        }
+    }
+    return scheduledReplicationCount;
+  }
+
+  /**
+   * Parse the data-nodes the block belongs to and choose one,
+   * which will be the replication source.
+   * 
+   * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
+   * since the former do not have write traffic and hence are less busy.
+   * We do not use already decommissioned nodes as a source.
+   * Otherwise we choose a random node among those that did not reach their 
+   * replication limit.
+   * 
+   * In addition form a list of all nodes containing the block
+   * and calculate its replication numbers.
+   */
+  private DatanodeDescriptor chooseSourceDatanode(
+                                    Block block,
+                                    List<DatanodeDescriptor> containingNodes,
+                                    NumberReplicas numReplicas) {
+    containingNodes.clear();
+    DatanodeDescriptor srcNode = null;
+    int live = 0;
+    int decommissioned = 0;
+    Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+    while(it.hasNext()) {
+      DatanodeDescriptor node = it.next();
+      if(!node.isDecommissionInProgress() && !node.isDecommissioned())
+        live++;
+      else
+        decommissioned++;
+      containingNodes.add(node);
+      if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+        continue; // already reached replication limit
+      // the block must not be scheduled for removal on srcNode
+      Collection<Block> excessBlocks = 
+        excessReplicateMap.get(node.getStorageID());
+      if(excessBlocks != null && excessBlocks.contains(block))
+        continue;
+      // never use already decommissioned nodes
+      if(node.isDecommissioned())
+        continue;
+      // we prefer nodes that are in DECOMMISSION_INPROGRESS state
+      if(node.isDecommissionInProgress() || srcNode == null) {
+        srcNode = node;
+        continue;
+      }
+      if(srcNode.isDecommissionInProgress())
+        continue;
+      // switch to a different node randomly
+      // this to prevent from deterministically selecting the same node even
+      // if the node failed to replicate the block on previous iterations
+      if(r.nextBoolean())
+        srcNode = node;
+    }
+    if(numReplicas != null)
+      numReplicas.initialize(live, decommissioned);
+    return srcNode;
+  }
+
+  /**
+   * Get blocks to invalidate for the first node 
+   * in {@link #recentInvalidateSets}.
+   * 
+   * @return number of blocks scheduled for removal during this iteration.
+   */
+  private synchronized int invalidateWorkForOneNode() {
+    // blocks should not be replicated or removed if safe mode is on
+    if (isInSafeMode())
+      return 0;
+    if(recentInvalidateSets.isEmpty())
+      return 0;
+    // get blocks to invalidate for the first node
+    String firstNodeId = recentInvalidateSets.keySet().iterator().next();
+    assert firstNodeId != null;
+    DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
+    Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
+ 
+    if(invalidateSet == null || dn == null)
+      return 0;
+
+    ArrayList<Block> blocksToInvalidate = 
+      new ArrayList<Block>(blockInvalidateLimit);
+
+    // # blocks that can be sent in one message is limited
+    Iterator<Block> it = invalidateSet.iterator();
+    for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
+                                                                blkCount++) {
+      blocksToInvalidate.add(it.next());
+      it.remove();
+    }
+
+    // If we could not send everything in this message, reinsert this item
+    // into the collection.
+    if(it.hasNext())
+      recentInvalidateSets.put(firstNodeId, invalidateSet);
+
+    dn.addBlocksToBeInvalidated(blocksToInvalidate);
+
+    if(NameNode.stateChangeLog.isInfoEnabled()) {
+      StringBuffer blockList = new StringBuffer();
+      for(Block blk : blocksToInvalidate) {
+        blockList.append(' ');
+        blockList.append(blk.getBlockName());
       }
-      if (doReplication || doInvalidation) {
-        //
-        // If we have already computed work for a predefined
-        // number of datanodes in this iteration, then relax
-        //
-        if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
-          break;
-        }
-        foundwork++;
-      } 
+      NameNode.stateChangeLog.info("BLOCK* ask "
+          + dn.getName() + " to delete " + blockList);
     }
+    return blocksToInvalidate.size();
+  }
+
+  void setNodeReplicationLimit(int limit) {
+    this.maxReplicationStreams = limit;
   }
 
   /**
@@ -2326,36 +2524,6 @@
   }
 
   /**
-   * Add more replication work for this datanode.
-   */
-  synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, 
-                                            Block[] blocklist,
-                                            DatanodeDescriptor[][] targets) 
-    throws IOException {
-    //
-    // Find the datanode with the FSNamesystem lock held.
-    //
-    DatanodeDescriptor n = getDatanode(node);
-    if (n != null) {
-      n.addBlocksToBeReplicated(blocklist, targets);
-    }
-  }
-
-  /**
-   * Add more block invalidation work for this datanode.
-   */
-  synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node, 
-                                             Block[] blocklist) throws IOException {
-    //
-    // Find the datanode with the FSNamesystem lock held.
-    //
-    DatanodeDescriptor n = getDatanode(node);
-    if (n != null) {
-      n.addBlocksToBeInvalidated(blocklist);
-    }
-  }
-
-  /**
    * remove a datanode descriptor
    * @param nodeID datanode ID
    */
@@ -3125,78 +3293,23 @@
   short getMinReplication()     { return (short)minReplication; }
   short getDefaultReplication() { return (short)defaultReplication; }
     
-  /////////////////////////////////////////////////////////
-  //
-  // These methods are called by the Namenode system, to see
-  // if there is any work for a given datanode.
-  //
-  /////////////////////////////////////////////////////////
-
-  /**
-   * Check if there are any recently-deleted blocks a datanode should remove.
-   */
-  public synchronized Block[] blocksToInvalidate(DatanodeID nodeID) {
-    // Ask datanodes to perform block delete  
-    // only if safe mode is off.
-    if (isInSafeMode())
-      return null;
-       
-    Collection<Block> invalidateSet = recentInvalidateSets.remove(
-                                                                  nodeID.getStorageID());
- 
-    if (invalidateSet == null) {
-      return null;
-    }
-
-    Iterator<Block> it = null;
-    int sendNum = invalidateSet.size();
-    ArrayList<Block> sendBlock = new ArrayList<Block>(sendNum);
-
-    //
-    // calculate the number of blocks that we send in one message
-    //
-    sendNum = Math.min(sendNum, blockInvalidateLimit);
-    
-    //
-    // Copy the first chunk into sendBlock
-    //
-    for (it = invalidateSet.iterator(); sendNum > 0; sendNum--) {
-      assert(it.hasNext());
-      sendBlock.add(it.next());
-      it.remove();
-    }
-
-    //
-    // If we could not send everything in this message, reinsert this item
-    // into the collection.
-    //
-    if (it.hasNext()) {
-      recentInvalidateSets.put(nodeID.getStorageID(), invalidateSet);
-    }
-        
-    if (NameNode.stateChangeLog.isInfoEnabled()) {
-      StringBuffer blockList = new StringBuffer();
-      for (int i = 0; i < sendBlock.size(); i++) {
-        blockList.append(' ');
-        Block block = sendBlock.get(i);
-        blockList.append(block.getBlockName());
-      }
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
-                                   +"ask "+nodeID.getName()+" to delete " + blockList);
-    }
-    return sendBlock.toArray(new Block[sendBlock.size()]);
-  }
-
-
   /**
    * A immutable object that stores the number of live replicas and
    * the number of decommissined Replicas.
    */
-  static class NumberReplicas {
+  private static class NumberReplicas {
     private int liveReplicas;
     private int decommissionedReplicas;
 
+    NumberReplicas() {
+      initialize(0, 0);
+    }
+
     NumberReplicas(int live, int decommissioned) {
+      initialize(live, decommissioned);
+    }
+
+    void initialize(int live, int decommissioned) {
       liveReplicas = live;
       decommissionedReplicas = decommissioned;
     }
@@ -3236,32 +3349,6 @@
   }
 
   /**
-   * Returns a newly allocated list of all nodes. Returns a count of
-   * live and decommissioned nodes.
-   */
-  ArrayList<DatanodeDescriptor> containingNodeList(Block b, NumberReplicas[] numReplicas) {
-    ArrayList<DatanodeDescriptor> nodeList = 
-      new ArrayList<DatanodeDescriptor>();
-    int count = 0;
-    int live = 0;
-    for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
-        it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-        live++;
-      }
-      else {
-        count++;
-      }
-      nodeList.add(node);
-    }
-    if (numReplicas != null) {
-      numReplicas[0] = new NumberReplicas(live, count);
-    }
-    return nodeList;
-  }
-
-  /**
    * Return true if there are any blocks on this node that have not
    * yet reached their replication factor. Otherwise returns false.
    */
@@ -3318,140 +3405,6 @@
     return false;
   }
 
-  /**
-   * Return with a list of Block/DataNodeInfo sets, indicating
-   * where various Blocks should be copied, ASAP.
-   *
-   * The Array that we return consists of two objects:
-   * The 1st elt is an array of Blocks.
-   * The 2nd elt is a 2D array of DatanodeDescriptor objs, identifying the
-   *     target sequence for the Block at the appropriate index.
-   *
-   */
-  public synchronized Object[] pendingTransfers(DatanodeID srcNode,
-                                                int needed) {
-    // Ask datanodes to perform block replication  
-    // only if safe mode is off.
-    if (isInSafeMode())
-      return null;
-    
-    synchronized (neededReplications) {
-      Object results[] = null;
-
-      if (neededReplications.size() > 0) {
-        //
-        // Go through all blocks that need replications. See if any
-        // are present at the current node. If so, ask the node to
-        // replicate them.
-        //
-        List<Block> replicateBlocks = new ArrayList<Block>();
-        List<NumberReplicas> numCurrentReplicas = new ArrayList<NumberReplicas>();
-        List<DatanodeDescriptor[]> replicateTargetSets;
-        replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
-        NumberReplicas[] allReplicas = new NumberReplicas[1];
-        for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
-          if (needed <= 0) {
-            break;
-          }
-          Block block = it.next();
-          long blockSize = block.getNumBytes();
-          INodeFile fileINode = blocksMap.getINode(block);
-          if (fileINode == null) { // block does not belong to any file
-            it.remove();
-          } else {
-            List<DatanodeDescriptor> containingNodes = 
-              containingNodeList(block, allReplicas);
-            Collection<Block> excessBlocks = excessReplicateMap.get(
-                                                                    srcNode.getStorageID());
-
-            // srcNode must contain the block, and the block must
-            // not be scheduled for removal on that node
-            if (containingNodes.contains(srcNode)
-                && (excessBlocks == null || !excessBlocks.contains(block))) {
-              int numCurrentReplica = allReplicas[0].liveReplicas() +
-                pendingReplications.getNumReplicas(block);
-              NumberReplicas repl = new NumberReplicas(numCurrentReplica,
-                                        allReplicas[0].decommissionedReplicas()); 
-              if (numCurrentReplica >= fileINode.getReplication()) {
-                it.remove();
-              } else {
-                DatanodeDescriptor targets[] = replicator.chooseTarget(
-                                                                       Math.min(fileINode.getReplication() - numCurrentReplica,
-                                                                                needed),
-                                                                       datanodeMap.get(srcNode.getStorageID()),
-                                                                       containingNodes, null, blockSize);
-                if (targets.length > 0) {
-                  // Build items to return
-                  replicateBlocks.add(block);
-                  numCurrentReplicas.add(repl);
-                  replicateTargetSets.add(targets);
-                  needed -= targets.length;
-                }
-              }
-            }
-          }
-        }
-
-        //
-        // Move the block-replication into a "pending" state.
-        // The reason we use 'pending' is so we can retry
-        // replications that fail after an appropriate amount of time.
-        // (REMIND - mjc - this timer is not yet implemented.)
-        //
-        if (replicateBlocks.size() > 0) {
-          int i = 0;
-          for (Iterator<Block> it = replicateBlocks.iterator(); it.hasNext(); i++) {
-            Block block = it.next();
-            DatanodeDescriptor targets[] = replicateTargetSets.get(i);
-            int numCurrentReplica = numCurrentReplicas.get(i).liveReplicas();
-            int numExpectedReplica = blocksMap.getINode(block).getReplication(); 
-            if (numCurrentReplica + targets.length >= numExpectedReplica) {
-              neededReplications.remove(
-                                        block, 
-                                        numCurrentReplica, 
-                                        numCurrentReplicas.get(i).decommissionedReplicas(),
-                                        numExpectedReplica);
-              pendingReplications.add(block, targets.length);
-              NameNode.stateChangeLog.debug(
-                                            "BLOCK* NameSystem.pendingTransfer: "
-                                            + block.getBlockName()
-                                            + " is removed from neededReplications to pendingReplications");
-            }
-
-            if (NameNode.stateChangeLog.isInfoEnabled()) {
-              StringBuffer targetList = new StringBuffer("datanode(s)");
-              for (int k = 0; k < targets.length; k++) {
-                targetList.append(' ');
-                targetList.append(targets[k].getName());
-              }
-              NameNode.stateChangeLog.info(
-                                           "BLOCK* NameSystem.pendingTransfer: " + "ask "
-                                           + srcNode.getName() + " to replicate "
-                                           + block.getBlockName() + " to " + targetList);
-              NameNode.stateChangeLog.debug(
-                                            "BLOCK* neededReplications = " + neededReplications.size()
-                                            + " pendingReplications = " + pendingReplications.size());
-            }
-          }
-
-          //
-          // Build returned objects from above lists
-          //
-          DatanodeDescriptor targetMatrix[][] = 
-            new DatanodeDescriptor[replicateTargetSets.size()][];
-          for (i = 0; i < targetMatrix.length; i++) {
-            targetMatrix[i] = replicateTargetSets.get(i);
-          }
-
-          results = new Object[2];
-          results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);
-          results[1] = targetMatrix;
-        }
-      }
-      return results;
-    }
-  }
-  
   /** 
    * Keeps track of which datanodes are allowed to connect to the namenode.
    */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Tue Mar 18 17:17:17 2008
@@ -100,7 +100,7 @@
   /**
    * The total number of blocks that are undergoing replication
    */
-  long size() {
+  int size() {
     return pendingReplications.size();
   } 
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java Tue Mar 18 17:17:17 2008
@@ -298,7 +298,7 @@
           return StorageState.NORMAL;
         if (hasPrevious)
           throw new InconsistentFSStateException(root,
-                                                 "version file in current directory it is missing.");
+                              "version file in current directory is missing.");
         return StorageState.NOT_FORMATTED;
       }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java Tue Mar 18 17:17:17 2008
@@ -23,7 +23,7 @@
  * Blocks have replication priority, with priority 0 indicating the highest
  * Blocks have only one replicas has the highest
  */
-class UnderReplicatedBlocks {
+class UnderReplicatedBlocks implements Iterable<Block> {
   private static final int LEVEL = 3;
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
       
@@ -173,7 +173,7 @@
   }
       
   /* return a iterator of all the under replication blocks */
-  synchronized Iterator<Block> iterator() {
+  public synchronized Iterator<Block> iterator() {
     return new Iterator<Block>() {
       private int level;
       private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java Tue Mar 18 17:17:17 2008
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.dfs;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -77,6 +79,14 @@
     // We do not need many handlers, since each thread simulates a handler
     // by calling name-node methods directly
     config.setInt("dfs.namenode.handler.count", 1);
+    // set exclude file
+    config.set("dfs.hosts.exclude", "${hadoop.tmp.dir}/dfs/hosts/exclude");
+    File excludeFile = new File(config.get("dfs.hosts.exclude", "exclude"));
+    if(! excludeFile.exists()) {
+      if(!excludeFile.getParentFile().mkdirs())
+        throw new IOException("NNThroughputBenchmark: cannot mkdir " + excludeFile);
+    }
+    new FileOutputStream(excludeFile).close();
     // Start the NameNode
     String[] args = new String[] {};
     nameNode = NameNode.createNameNode(args, config);
@@ -113,6 +123,8 @@
     protected long cumulativeTime = 0;    // sum of times for each op
     protected long elapsedTime = 0;       // time from start to finish
 
+    protected List<StatsDaemon> daemons;
+
     /**
      * Operation name.
      */
@@ -155,6 +167,11 @@
      */
     abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
 
+    /**
+     * Print the results of the benchmarking.
+     */
+    abstract void printResults();
+
     OperationStatsBase() {
       baseDir = BASE_DIR_NAME + "/" + getOpName();
       replication = (short) config.getInt("dfs.replication", 3);
@@ -163,7 +180,7 @@
     }
 
     void benchmark() throws IOException {
-      List<StatsDaemon> daemons = new ArrayList<StatsDaemon>();
+      daemons = new ArrayList<StatsDaemon>();
       long start = 0;
       try {
         numOpsExecuted = 0;
@@ -191,7 +208,7 @@
         for(StatsDaemon d : daemons)
           d.start();
       } finally {
-        while(isInPorgress(daemons)) {
+        while(isInPorgress()) {
           // try {Thread.sleep(500);} catch (InterruptedException e) {}
         }
         elapsedTime = System.currentTimeMillis() - start;
@@ -202,9 +219,9 @@
       }
     }
 
-    private boolean isInPorgress(List<StatsDaemon> daemons) {
+    private boolean isInPorgress() {
       for(StatsDaemon d : daemons)
-        if(d.isInPorgress())
+        if(d.isInProgress())
           return true;
       return false;
     }
@@ -269,7 +286,7 @@
       return false;
     }
 
-    void printResults() {
+    void printStats() {
       LOG.info("--- " + getOpName() + " stats  ---");
       LOG.info("# operations: " + getNumOpsExecuted());
       LOG.info("Elapsed Time: " + getElapsedTime());
@@ -293,7 +310,6 @@
       this.daemonId = daemonId;
       this.opsPerThread = nrOps;
       this.statsOp = op;
-      // this.clientName = statsOp.getClientName(daemonId);
       setName(toString());
     }
 
@@ -322,9 +338,16 @@
       }
     }
 
-    boolean isInPorgress() {
+    boolean isInProgress() {
       return localNumOpsExecuted < opsPerThread;
     }
+
+    /**
+     * Schedule to stop this daemon.
+     */
+    void terminate() {
+      opsPerThread = localNumOpsExecuted;
+    }
   }
 
   /**
@@ -379,10 +402,8 @@
       long fNum = fileCount % filesPerDirectory;
       if(fNum == 0) {
         currentDir = getNextDirName();
-        // System.out.println("currentDir: " + currentDir);
       }
       String fn = currentDir + "/" + FILE_NAME_PREFFIX + fileCount;
-      // System.out.println("getNextFileName(): " + fn + " fileCount = " + fileCount);
       fileCount++;
       return fn;
     }
@@ -481,7 +502,7 @@
       LOG.info("nrFiles = " + numOpsRequired);
       LOG.info("nrThreads = " + numThreads);
       LOG.info("nrFilesPerDir = " + nameGenerator.filesPerDirectory);
-      super.printResults();
+      printStats();
     }
   }
 
@@ -538,6 +559,8 @@
   private static class TinyDatanode implements Comparable<String> {
     private static final long DF_CAPACITY = 100*1024*1024;
     private static final long DF_USED = 0;
+    
+    NamespaceInfo nsInfo;
     DatanodeRegistration dnRegistration;
     Block[] blocks;
     int nrBlocks; // actual number of blocks
@@ -563,15 +586,23 @@
       this.nrBlocks = 0;
     }
 
+    String getName() {
+      return dnRegistration.getName();
+    }
+
     void register() throws IOException {
       // get versions from the namenode
-      NamespaceInfo nsInfo = nameNode.versionRequest();
+      nsInfo = nameNode.versionRequest();
       dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
       DataNode.setNewStorageID(dnRegistration);
       // register datanode
       dnRegistration = nameNode.register(dnRegistration);
     }
 
+    /**
+     * Send a heartbeat to the name-node.
+     * Ignore reply commands.
+     */
     void sendHeartbeat() throws IOException {
       // register datanode
       DatanodeCommand cmd = nameNode.sendHeartbeat(
@@ -597,7 +628,46 @@
     }
 
     public int compareTo(String name) {
-      return dnRegistration.getName().compareTo(name);
+      return getName().compareTo(name);
+    }
+
+    /**
+     * Send a heartbeat to the name-node and replicate blocks if requested.
+     */
+    int replicateBlocks() throws IOException {
+      // register datanode
+      DatanodeCommand cmd = nameNode.sendHeartbeat(
+          dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
+      if(cmd == null || cmd.getAction() != DatanodeProtocol.DNA_TRANSFER)
+        return 0;
+      // Send a copy of a block to another datanode
+      BlockCommand bcmd = (BlockCommand)cmd;
+      return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+    }
+
+    /**
+     * Transfer blocks to another data-node.
+     * Just report on behalf of the other data-node
+     * that the blocks have been received.
+     */
+    private int transferBlocks( Block blocks[], 
+                                DatanodeInfo xferTargets[][] 
+                              ) throws IOException {
+      for(int i = 0; i < blocks.length; i++) {
+        DatanodeInfo blockTargets[] = xferTargets[i];
+        for(int t = 0; t < blockTargets.length; t++) {
+          DatanodeInfo dnInfo = blockTargets[t];
+          DatanodeRegistration receivedDNReg;
+          receivedDNReg = new DatanodeRegistration(dnInfo.getName());
+          receivedDNReg.setStorageInfo(
+                          new DataStorage(nsInfo, dnInfo.getStorageID()));
+          receivedDNReg.setInfoPort(dnInfo.getInfoPort());
+          nameNode.blockReceived( receivedDNReg, 
+                                  new Block[] {blocks[i]},
+                                  new String[] {DataNode.EMPTY_DEL_HINT});
+        }
+      }
+      return blocks.length;
     }
   }
 
@@ -611,7 +681,8 @@
   class BlockReportStats extends OperationStatsBase {
     static final String OP_BLOCK_REPORT_NAME = "blockReport";
     static final String OP_BLOCK_REPORT_USAGE = 
-      "-op blockReport [-datanodes T] [-reports R] [-blocksPerReport B] [-blocksPerFile F]";
+      "-op blockReport [-datanodes T] [-reports N] " +
+      "[-blocksPerReport B] [-blocksPerFile F]";
 
     private int blocksPerReport;
     private int blocksPerFile;
@@ -670,10 +741,10 @@
       for(int idx=0; idx < nrDatanodes; idx++) {
         datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
         datanodes[idx].register();
-        assert datanodes[idx].dnRegistration.getName().compareTo(prevDNName) > 0
+        assert datanodes[idx].getName().compareTo(prevDNName) > 0
           : "Data-nodes must be sorted lexicographically.";
         datanodes[idx].sendHeartbeat();
-        prevDNName = datanodes[idx].dnRegistration.getName();
+        prevDNName = datanodes[idx].getName();
       }
       int numResolved = 0;
       DatanodeInfo[] dnInfos = nameNode.getDatanodeReport(DatanodeReportType.ALL);
@@ -741,14 +812,6 @@
       return end-start;
     }
 
-    /**
-     * Defines data-node name since client are data-nodes in this case.
-     */
-    @Override
-    String getClientName(int idx) {
-      return getOpName() + "-client-" + idx;
-    }
-
     void printResults() {
       String blockDistribution = "";
       String delim = "(";
@@ -762,9 +825,164 @@
       LOG.info("datanodes = " + numThreads + " " + blockDistribution);
       LOG.info("blocksPerReport = " + blocksPerReport);
       LOG.info("blocksPerFile = " + blocksPerFile);
-      super.printResults();
+      printStats();
     }
-  }
+  }   // end BlockReportStats
+
+  /**
+   * Measures how fast replication monitor can compute data-node work.
+   * 
+   * It runs only one thread until no more work can be scheduled.
+   */
+  class ReplicationStats extends OperationStatsBase {
+    static final String OP_REPLICATION_NAME = "replication";
+    static final String OP_REPLICATION_USAGE = 
+      "-op replication [-datanodes T] [-nodesToDecommission D] " +
+      "[-nodeReplicationLimit C] [-totalBlocks B] [-replication R]";
+
+    private BlockReportStats blockReportObject;
+    private int numDatanodes;
+    private int nodesToDecommission;
+    private int nodeReplicationLimit;
+    private int totalBlocks;
+    private int numDecommissionedBlocks;
+    private int numPendingBlocks;
+
+    ReplicationStats(String[] args) {
+      super();
+      numThreads = 1;
+      numDatanodes = 3;
+      nodesToDecommission = 1;
+      nodeReplicationLimit = 100;
+      totalBlocks = 100;
+      parseArguments(args);
+      // number of operations is 4 times the number of decommissioned
+      // blocks divided by the number of needed replications scanned 
+      // by the replication monitor in one iteration
+      numOpsRequired = (totalBlocks*replication*nodesToDecommission*2)
+            / (numDatanodes*numDatanodes);
+
+      String[] blkReportArgs = {
+        "-op", "blockReport",
+        "-datanodes", String.valueOf(numDatanodes),
+        "-blocksPerReport", String.valueOf(totalBlocks*replication/numDatanodes),
+        "-blocksPerFile", String.valueOf(numDatanodes)};
+      blockReportObject = new BlockReportStats(blkReportArgs);
+      numDecommissionedBlocks = 0;
+      numPendingBlocks = 0;
+    }
+
+    String getOpName() {
+      return OP_REPLICATION_NAME;
+    }
+
+    void parseArguments(String[] args) {
+      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
+      for (int i = 2; i < args.length; i++) {       // parse command line
+        if(args[i].equals("-datanodes")) {
+          if(i+1 == args.length)  printUsage();
+          numDatanodes = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-nodesToDecommission")) {
+          if(i+1 == args.length)  printUsage();
+          nodesToDecommission = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-nodeReplicationLimit")) {
+          if(i+1 == args.length)  printUsage();
+          nodeReplicationLimit = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-totalBlocks")) {
+          if(i+1 == args.length)  printUsage();
+          totalBlocks = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-replication")) {
+          if(i+1 == args.length)  printUsage();
+          replication = Short.parseShort(args[++i]);
+        } else if(!ignoreUnrelatedOptions)
+          printUsage();
+      }
+    }
+
+    void generateInputs(int[] ignore) throws IOException {
+      // start data-nodes; create a bunch of files; generate block reports.
+      blockReportObject.generateInputs(ignore);
+      // stop replication monitor
+      nameNode.namesystem.replthread.interrupt();
+      try {
+        nameNode.namesystem.replthread.join();
+      } catch(InterruptedException ei) {
+        return;
+      }
+      // report blocks once
+      int nrDatanodes = blockReportObject.getNumDatanodes();
+      for(int idx=0; idx < nrDatanodes; idx++) {
+        blockReportObject.executeOp(idx, 0, null);
+      }
+      // decommission data-nodes
+      decommissionNodes();
+      // set node replication limit
+      nameNode.namesystem.setNodeReplicationLimit(nodeReplicationLimit);
+    }
+
+    private void decommissionNodes() throws IOException {
+      String excludeFN = config.get("dfs.hosts.exclude", "exclude");
+      FileOutputStream excludeFile = new FileOutputStream(excludeFN);
+      excludeFile.getChannel().truncate(0L);
+      int nrDatanodes = blockReportObject.getNumDatanodes();
+      numDecommissionedBlocks = 0;
+      for(int i=0; i < nodesToDecommission; i++) {
+        TinyDatanode dn = blockReportObject.datanodes[nrDatanodes-1-i];
+        numDecommissionedBlocks += dn.nrBlocks;
+        excludeFile.write(dn.getName().getBytes());
+        excludeFile.write('\n');
+        LOG.info("Datanode " + dn.getName() + " is decommissioned.");
+      }
+      excludeFile.close();
+      nameNode.refreshNodes();
+    }
+
+    /**
+     * Does not require the argument
+     */
+    String getExecutionArgument(int daemonId) {
+      return null;
+    }
+
+    long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
+      assert daemonId < numThreads : "Wrong daemonId.";
+      long start = System.currentTimeMillis();
+      // compute datanode work
+      int work = nameNode.namesystem.computeDatanodeWork();
+      long end = System.currentTimeMillis();
+      numPendingBlocks += work;
+      if(work == 0)
+        daemons.get(daemonId).terminate();
+      return end-start;
+    }
+
+    void printResults() {
+      String blockDistribution = "";
+      String delim = "(";
+      int totalReplicas = 0;
+      for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
+        totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
+        blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
+        delim = ", ";
+      }
+      blockDistribution += ")";
+      LOG.info("--- " + getOpName() + " inputs ---");
+      LOG.info("numOpsRequired = " + numOpsRequired);
+      LOG.info("datanodes = " + numDatanodes + " " + blockDistribution);
+      LOG.info("decommissioned datanodes = " + nodesToDecommission);
+      LOG.info("datanode replication limit = " + nodeReplicationLimit);
+      LOG.info("total blocks = " + totalBlocks);
+      printStats();
+      LOG.info("decommissioned blocks = " + numDecommissionedBlocks);
+      LOG.info("pending replications = " + numPendingBlocks);
+      LOG.info("replications per sec: " + getBlocksPerSecond());
+    }
+
+    private double getBlocksPerSecond() {
+      return elapsedTime == 0 ? 0 : 1000*(double)numPendingBlocks / elapsedTime;
+    }
+
+  }   // end ReplicationStats
 
   static void printUsage() {
     System.err.println("Usage: NNThroughputBenchmark"
@@ -772,6 +990,7 @@
         + " | \n\t" + CreateFileStats.OP_CREATE_USAGE
         + " | \n\t" + OpenFileStats.OP_OPEN_USAGE
         + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
+        + " | \n\t" + ReplicationStats.OP_REPLICATION_USAGE
     );
     System.exit(-1);
   }
@@ -802,6 +1021,10 @@
       }
       if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
         opStat = bench.new BlockReportStats(args);
+        ops.add(opStat);
+      }
+      if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) {
+        opStat = bench.new ReplicationStats(args);
         ops.add(opStat);
       }
       if(ops.size() == 0)