You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/12/05 20:49:16 UTC

svn commit: r601491 - in /lucene/hadoop/trunk: ./ bin/ conf/ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Wed Dec  5 11:49:16 2007
New Revision: 601491

URL: http://svn.apache.org/viewvc?rev=601491&view=rev
Log:
HADOOP-1652.  A utility to balance data among datanodes in a HDFS cluster.
(Hairong Kuang via dhruba)


Added:
    lucene/hadoop/trunk/bin/start-balancer.sh   (with props)
    lucene/hadoop/trunk/bin/stop-balancer.sh   (with props)
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java   (with props)
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java   (with props)
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/bin/hadoop
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec  5 11:49:16 2007
@@ -37,6 +37,9 @@
 
     HADOOP-2299.  Defination of a login interface.  A simple implementation for
     Unix users and groups. (Hairong Kuang via dhruba)
+
+    HADOOP-1652.  A utility to balance data among datanodes in a HDFS cluster.
+    (Hairong Kuang via dhruba)
     
   IMPROVEMENTS
 

Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/hadoop?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Wed Dec  5 11:49:16 2007
@@ -40,6 +40,7 @@
   echo "  dfsadmin             run a DFS admin client"
   echo "  fsck                 run a DFS filesystem checking utility"
   echo "  fs                   run a generic filesystem user client"
+  echo "  balancer             run a cluster balancing utility"
   echo "  jobtracker           run the MapReduce job Tracker node" 
   echo "  pipes                run a Pipes job"
   echo "  tasktracker          run a MapReduce task Tracker node" 
@@ -180,6 +181,8 @@
   CLASS=org.apache.hadoop.dfs.DFSAdmin
 elif [ "$COMMAND" = "fsck" ] ; then
   CLASS=org.apache.hadoop.dfs.DFSck
+elif [ "$COMMAND" = "balancer" ] ; then
+  CLASS=org.apache.hadoop.dfs.Balancer
 elif [ "$COMMAND" = "jobtracker" ] ; then
   CLASS=org.apache.hadoop.mapred.JobTracker
 elif [ "$COMMAND" = "tasktracker" ] ; then

Added: lucene/hadoop/trunk/bin/start-balancer.sh
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/start-balancer.sh?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/bin/start-balancer.sh (added)
+++ lucene/hadoop/trunk/bin/start-balancer.sh Wed Dec  5 11:49:16 2007
@@ -0,0 +1,5 @@
+#!/usr/bin/env bash
+
+# Start balancer daemon.
+
+hadoop-daemon.sh start balancer $@

Propchange: lucene/hadoop/trunk/bin/start-balancer.sh
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/bin/start-balancer.sh
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/bin/start-balancer.sh
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/bin/stop-balancer.sh
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/stop-balancer.sh?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/bin/stop-balancer.sh (added)
+++ lucene/hadoop/trunk/bin/stop-balancer.sh Wed Dec  5 11:49:16 2007
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+# Stop balancer daemon.
+# Run this on the machine where the balancer is running
+
+hadoop-daemon.sh stop balancer

Propchange: lucene/hadoop/trunk/bin/stop-balancer.sh
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/bin/stop-balancer.sh
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/bin/stop-balancer.sh
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Dec  5 11:49:16 2007
@@ -389,6 +389,16 @@
 </property>
 
 <property>
+  <name>dfs.balance.bandwidthPerSec</name>
+  <value>1048576</value>
+  <description>
+        Specifies the maximum amount of bandwidth that each datanode
+        can utilize for the balancing purpose in term of
+        the number of bytes per second.
+  </description>
+</property>
+
+<property>
   <name>dfs.hosts</name>
   <value></value>
   <description>Names a file that contains a list of hosts that are

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Wed Dec  5 11:49:16 2007
@@ -0,0 +1,1511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Formatter;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
+ * when some datanodes become full or when new empty nodes join the cluster.
+ * The tool is deployed as an application program that can be run by the 
+ * cluster administrator on a live HDFS cluster while applications
+ * adding and deleting files.
+ * 
+ * <p>SYNOPSIS
+ * <pre>
+ * To start:
+ *      bin/start-balancer.sh [-threshold <threshold>]
+ *      Example: bin/ start-balancer.sh 
+ *                     start the balancer with a default threshold of 10%
+ *               bin/ start-balancer.sh -threshold 5
+ *                     start the balancer with a threshold of 5%
+ * To stop:
+ *      bin/ stop-balancer.sh
+ * </pre>
+ * 
+ * <p>DESCRIPTION
+ * <p>The threshold parameter is a fraction in the range of (0%, 100%) with a 
+ * default value of 10%. The threshold sets a target for whether the cluster 
+ * is balanced. A cluster is balanced if for each datanode, the utilization 
+ * of the node (ratio of used space at the node to total capacity of the node) 
+ * differs from the utilization of the (ratio of used space in the cluster 
+ * to total capacity of the cluster) by no more than the threshold value. 
+ * The smaller the threshold, the more balanced a cluster will become. 
+ * It takes more time to run the balancer for small threshold values. 
+ * Also for a very small threshold the cluster may not be able to reach the 
+ * balanced state when applications write and delete files concurrently.
+ * 
+ * <p>The tool moves blocks from highly utilized datanodes to poorly 
+ * utilized datanodes iteratively. In each iteration a datanode moves or 
+ * receives no more than the lesser of 10G bytes or the threshold fraction 
+ * of its capacity. Each iteration runs no more than 20 minutes.
+ * At the end of each iteration, the balancer obtains updated datanodes
+ * information from the namenode.
+ * 
+ * <p>A system property that limits the balancer's use of bandwidth is 
+ * defined in the default configuration file:
+ * <pre>
+ * <property>
+ *   <name>dfs.balance.bandwidthPerSec</name>
+ *   <value>1048576</value>
+ * <description>  Specifies the maximum bandwidth that each datanode 
+ * can utilize for the balancing purpose in term of the number of bytes 
+ * per second. </description>
+ * </property>
+ * </pre>
+ * 
+ * <p>This property determines the maximum speed at which a block will be 
+ * moved from one datanode to another. The default value is 1MB/s. The higher 
+ * the bandwidth, the faster a cluster can reach the balanced state, 
+ * but with greater competition with application processes. If an 
+ * administrator changes the value of this property in the configuration 
+ * file, the change is observed when HDFS is next restarted.
+ * 
+ * <p>MONITERING BALANCER PROGRESS
+ * <p>After the balancer is started, an output file name where the balancer 
+ * progress will be recorded is printed on the screen.  The administrator 
+ * can monitor the running of the balancer by reading the output file. 
+ * The output shows the balancer's status iteration by iteration. In each 
+ * iteration it prints the starting time, the iteration number, the total 
+ * number of bytes that have been moved in the previous iterations, 
+ * the total number of bytes that are left to move in order for the cluster 
+ * to be balanced, and the number of bytes that are being moved in this 
+ * iteration. Normally "Bytes Already Moved" is increasing while "Bytes Left 
+ * To Move" is decreasing.
+ * 
+ * <p>Running multiple instances of the balancer in an HDFS cluster is 
+ * prohibited by the tool.
+ * 
+ * <p>The balancer automatically exits when any of the following five 
+ * conditions is satisfied:
+ * <ol>
+ * <li>The cluster is balanced;
+ * <li>No block can be moved;
+ * <li>No block has been moved for five consecutive iterations;
+ * <li>An IOException occurs while communicating with the namenode;
+ * <li>Another balancer is running.
+ * </ol>
+ * 
+ * <p>Upon exit, a balancer returns an exit code and prints one of the 
+ * following messages to the output file in corresponding to the above exit 
+ * reasons:
+ * <ol>
+ * <li>The cluster is balanced. Exiting
+ * <li>No block can be moved. Exiting...
+ * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>Received an IO exception: failure reason. Exiting...
+ * <li>Another balancer is running. Exiting...
+ * </ol>
+ * 
+ * <p>The administrator can interrupt the execution of the balancer at any 
+ * time by running the command "stop-balancer.sh" on the machine where the 
+ * balancer is running.
+ */
+
+public class Balancer implements Tool {
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
+  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+
+  private Configuration conf;
+
+  private double threshold = 10D;
+  private NamenodeProtocol namenode;
+  private ClientProtocol client;
+  private FileSystem fs;
+  private final static Random rnd = new Random();
+  
+  // all data node lists
+  private Collection<Source> overUtilizedDatanodes
+                               = new LinkedList<Source>();
+  private Collection<Source> aboveAvgUtilizedDatanodes
+                               = new LinkedList<Source>();
+  private Collection<BalancerDatanode> belowAvgUtilizedDatanodes
+                               = new LinkedList<BalancerDatanode>();
+  private Collection<BalancerDatanode> underUtilizedDatanodes
+                               = new LinkedList<BalancerDatanode>();
+  
+  private Collection<Source> sources
+                               = new HashSet<Source>();
+  private Collection<BalancerDatanode> targets
+                               = new HashSet<BalancerDatanode>();
+  
+  private Map<Block, BalancerBlock> globalBlockList
+                 = new HashMap<Block, BalancerBlock>();
+  private Map<Block, BalancerBlock> movedBlocks 
+                 = new HashMap<Block, BalancerBlock>();
+  private Map<String, BalancerDatanode> datanodes
+                 = new HashMap<String, BalancerDatanode>();
+  
+  private NetworkTopology cluster = new NetworkTopology();
+  
+  private double avgUtilization = 0.0D;
+  
+  /* This class keeps track of a scheduled block move */
+  private class PendingBlockMove {
+    private BalancerBlock block;
+    private Source source;
+    private BalancerDatanode proxySource;
+    private BalancerDatanode target;
+    
+    /** constructor */
+    private PendingBlockMove() {
+    }
+    
+    /* choose a block & a proxy source for this pendingMove 
+     * whose source & target have already been chosen.
+     * 
+     * Return true if a block and its proxy are chosen; false otherwise
+     */
+    private boolean chooseBlockAndProxy() {
+      // iterate all source's blocks until find a good one    
+      for (Iterator<BalancerBlock> blocks=
+        source.getBlockIterator(); blocks.hasNext();) {
+        if (markMovedIfGoodBlock(blocks.next())) {
+          blocks.remove();
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    /* Return true if the given block is good for the tentative move;
+     * If it is good, add it to the moved list to marked as "Moved".
+     * A block is good if
+     * 1. it is a good candidate; see isGoodBlockCandidate
+     * 2. can find a proxy source that's not busy for this move
+     */
+    private boolean markMovedIfGoodBlock(BalancerBlock block) {
+      synchronized(block) {
+        synchronized(movedBlocks) {
+          if (isGoodBlockCandidate(source, target, block)) {
+            this.block = block;
+            if ( chooseProxySource() ) {
+              addToMoved(block);
+              LOG.info("Decided to move block "+ block.getBlockId()
+                  +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+                  + " bytes from " + source.getName() 
+                  + " to " + target.getName()
+                  + " using proxy source " + proxySource.getName() );
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+    
+    /* Now we find out source, target, and block, we need to find a proxy
+     * 
+     * @return true if a proxy is found; otherwise false
+     */
+    private boolean chooseProxySource() {
+      // check if there is replica which is on the same rack with the target
+      for (BalancerDatanode loc : block.getLocations()) {
+        if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
+          if (loc.addPendingBlock(this)) {
+            proxySource = loc;
+            return true;
+          }
+        }
+      }
+      // find out a non-busy replica
+      for (BalancerDatanode loc : block.getLocations()) {
+        if (loc.addPendingBlock(this)) {
+          proxySource = loc;
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    /* Dispatch the block move task to the proxy source & wait for the response
+     */
+    private void dispatch() {
+      Socket sock = new Socket();
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        sock.connect(DataNode.createSocketAddr(
+            proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
+        long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+        sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+
+            (int)(block.getNumBytes()*1500/bandwidth));
+        out = new DataOutputStream( new BufferedOutputStream(
+            sock.getOutputStream(), FSConstants.BUFFER_SIZE));
+        sendRequest(out);
+        in = new DataInputStream( new BufferedInputStream(
+            sock.getInputStream(), FSConstants.BUFFER_SIZE));
+        receiveResponse(in);
+        bytesMoved.inc(block.getNumBytes());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug( "Moving block " + block.getBlock().getBlockId() +
+              " from "+ source.getName() + " to " +
+              target.getName() + " through " +
+              proxySource.getName() +
+              " succeeded." );
+        }
+      } catch (SocketTimeoutException te) { 
+        LOG.warn("Timeout moving block "+block.getBlockId()+
+            " from " + source.getName() + " to " +
+            target.getName() + " through " +
+            proxySource.getName());
+      } catch (IOException e) {
+        LOG.warn("Error moving block "+block.getBlockId()+
+            " from " + source.getName() + " to " +
+            target.getName() + " through " +
+            proxySource.getName() +
+            ": "+e.getMessage()+ "\n" +
+            StringUtils.stringifyException(e) );
+      } finally {
+        IOUtils.closeStream(out);
+        IOUtils.closeStream(in);
+        IOUtils.closeSocket(sock);
+        
+        proxySource.removePendingBlock(this);
+        synchronized(target) {
+          target.removePendingBlock(this);
+        }
+
+        synchronized (this ) {
+          reset();
+        }
+        synchronized (Balancer.this) {
+          Balancer.this.notifyAll();
+        }
+      }
+    }
+    
+    /* Send a block copy request to the outputstream*/
+    private void sendRequest(DataOutputStream out) throws IOException {
+      out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+      out.writeByte(FSConstants.OP_COPY_BLOCK);
+      out.writeLong(block.getBlock().getBlockId());
+      Text.writeString(out, source.getStorageID());
+      target.write(out);
+      out.flush();
+    }
+    
+    /* Receive a block copy response from the input stream */ 
+    private void receiveResponse(DataInputStream in) throws IOException {
+      short status = in.readShort();
+      if (status != FSConstants.OP_STATUS_SUCCESS) {
+        throw new IOException("Moving block "+block.getBlockId()+
+            " from "+source.getName() + " to " +
+            target.getName() + " through " +
+            proxySource.getName() +
+        "failed");
+      }
+    }
+
+    /* reset the object */
+    private void reset() {
+      block = null;
+      source = null;
+      proxySource = null;
+      target = null;
+    }
+    
+    /* start a thread to dispatch the block move */
+    private void scheduleBlockMove() {
+      BlockMover blockMover = new BlockMover();
+      blockMover.setDaemon(true);
+      blockMover.setName("Block mover for "+ block.getBlockId() +
+          " from " + proxySource.getName() + " to " + target.getName());
+      LOG.info("Starting " + blockMover.getName());
+      blockMover.start();
+    }
+    
+    /* A thread for moving a block */
+    private class BlockMover extends Thread {
+      BlockMover() {
+      }
+      
+      public void run() {
+        dispatch();
+      }
+    }
+  }
+  
+  /* A class for keeping track of blocks in the Balancer */
+  static private class BalancerBlock {
+    private Block block; // the block
+    private List<BalancerDatanode> locations
+            = new ArrayList<BalancerDatanode>(3); // its locations
+    
+    /* Constructor */
+    private BalancerBlock(Block block) {
+      this.block = block;
+    }
+    
+    /* clean block locations */
+    private synchronized void clearLocations() {
+      locations.clear();
+    }
+    
+    /* add a location */
+    private synchronized void addLocation(BalancerDatanode datanode) {
+      if (!locations.contains(datanode)) {
+        locations.add(datanode);
+      }
+    }
+    
+    /* Return if the block is located on <code>datanode</code> */
+    private synchronized boolean isLocatedOnDatanode(
+        BalancerDatanode datanode) {
+      return locations.contains(datanode);
+    }
+    
+    /* Return its locations */
+    private synchronized List<BalancerDatanode> getLocations() {
+      return locations;
+    }
+    
+    /* Return the block */
+    private Block getBlock() {
+      return block;
+    }
+    
+    /* Return the block id */
+    private long getBlockId() {
+      return block.getBlockId();
+    }
+    
+    /* Return the length of the block */
+    private long getNumBytes() {
+      return block.getNumBytes();
+    }
+  }
+  
+  /* The class represents a desired move of bytes between two nodes 
+   * and the target.
+   * An object of this class is stored in a source node. 
+   */
+  static private class NodeTask {
+    private BalancerDatanode datanode; //target node
+    private long size;  //bytes scheduled to move
+    
+    /* constructor */
+    private NodeTask(BalancerDatanode datanode, long size) {
+      this.datanode = datanode;
+      this.size = size;
+    }
+    
+    /* Get the node */
+    private BalancerDatanode getDatanode() {
+      return datanode;
+    }
+    
+    /* Get the number of bytes that need to be moved */
+    private long getSize() {
+      return size;
+    }
+  }
+  
+  /* Return the utilization of a datanode */
+  static private double getUtilization(DatanodeInfo datanode) {
+    return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
+  }
+  
+  /* A class that keeps track of a datanode in Balancer */
+  private static class BalancerDatanode implements Writable {
+    final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+    final protected static short MAX_NUM_CONCURRENT_MOVES =
+      DataNode.MAX_BALANCING_THREADS;
+    protected DatanodeInfo datanode;
+    private double utilization;
+    protected long maxSizeToMove;
+    protected long scheduledSize = 0L;
+    //  blocks being moved but not confirmed yet
+    private List<PendingBlockMove> pendingBlocks = 
+      new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
+    
+    /* Constructor 
+     * Depending on avgutil & threshold, calculate maximum bytes to move 
+     */
+    private BalancerDatanode(
+        DatanodeInfo node, double avgUtil, double threshold) {
+      datanode = node;
+      utilization = Balancer.getUtilization(node);
+        
+      if (utilization >= avgUtil+threshold
+          || utilization <= avgUtil-threshold) { 
+        maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
+      } else {
+        maxSizeToMove = 
+          (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
+      }
+      if (utilization < avgUtil ) {
+        maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
+      }
+      maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+    }
+    
+    /** Get the datanode */
+    protected DatanodeInfo getDatanode() {
+      return datanode;
+    }
+    
+    /** Get the name of the datanode */
+    protected String getName() {
+      return datanode.getName();
+    }
+    
+    /* Get the storage id of the datanode */
+    protected String getStorageID() {
+      return datanode.getStorageID();
+    }
+    
+    /** Decide if still need to move more bytes */
+    protected boolean isMoveQuotaFull() {
+      return scheduledSize<maxSizeToMove;
+    }
+
+    /** Return the total number of bytes that need to be moved */
+    protected long availableSizeToMove() {
+      return maxSizeToMove-scheduledSize;
+    }
+    
+    /* increment scheduled size */
+    protected void incScheduledSize(long size) {
+      scheduledSize += size;
+    }
+    
+    /* Check if the node can schedule more blocks to move */
+    synchronized private boolean isPendingQNotFull() {
+      if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
+        return true;
+      }
+      return false;
+    }
+    
+    /* Check if all the dispatched moves are done */
+    synchronized private boolean isPendingQEmpty() {
+      return pendingBlocks.isEmpty();
+    }
+    
+    /* Add a scheduled block move to the node */
+    private synchronized boolean addPendingBlock(
+        PendingBlockMove pendingBlock) {
+      if (isPendingQNotFull()) {
+        return pendingBlocks.add(pendingBlock);
+      }
+      return false;
+    }
+    
+    /* Remove a scheduled block move from the node */
+    private synchronized boolean  removePendingBlock(
+        PendingBlockMove pendingBlock) {
+      return pendingBlocks.remove(pendingBlock);
+    }
+
+    /** The following two methods support the Writable interface */
+    /** Deserialize */
+    public void readFields(DataInput in) throws IOException {
+      datanode.readFields(in);
+    }
+
+    /** Serialize */
+    public void write(DataOutput out) throws IOException {
+      datanode.write(out);
+    }
+  }
+  
+  /** A node that can be the sources of a block move */
+  private class Source extends BalancerDatanode {
+    
+    /* A thread that initiates a block move 
+     * and waits for block move to complete */
+    private class BlockMoveDispatcher extends Thread {
+      public void run() {
+        dispatchBlocks();
+      }
+    }
+    
+    private ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
+    private long blocksToReceive = 0L;
+    /* source blocks point to balancerBlocks in the global list because
+     * we want to keep one copy of a block in balancer and be aware that
+     * the locations are changing over time.
+     */
+    private List<BalancerBlock> srcBlockList
+            = new ArrayList<BalancerBlock>();
+    
+    /* constructor */
+    private Source(DatanodeInfo node, double avgUtil, double threshold) {
+      super(node, avgUtil, threshold);
+    }
+    
+    /** Add a node task */
+    private void addNodeTask(NodeTask task) {
+      assert (task.datanode != this) :
+        "Source and target are the same " + datanode.getName();
+      incScheduledSize(task.getSize());
+      nodeTasks.add(task);
+    }
+    
+    /* Return an iterator to this source's blocks */
+    private Iterator<BalancerBlock> getBlockIterator() {
+      return srcBlockList.iterator();
+    }
+    
+    /* fetch new blocks of this source from namenode and
+     * update this source's block list & the global block list
+     * Return the total size of the received blocks in the number of bytes.
+     */
+    private long getBlockList() throws IOException {
+      BlockWithLocations[] newBlocks = namenode.getBlocks(datanode, 
+        (long)Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+      long bytesReceived = 0;
+      for (BlockWithLocations blk : newBlocks) {
+        bytesReceived += blk.getBlock().getNumBytes();
+        BalancerBlock block;
+        synchronized(globalBlockList) {
+          block = globalBlockList.get(blk.getBlock());
+          if (block==null) {
+            block = new BalancerBlock(blk.getBlock());
+            globalBlockList.put(blk.getBlock(), block);
+          } else {
+            block.clearLocations();
+          }
+        
+          synchronized (block) {
+            // update locations
+            for ( String location : blk.getDatanodes() ) {
+              BalancerDatanode datanode = datanodes.get(location);
+              if (datanode != null) { // not an unknown datanode
+                block.addLocation(datanode);
+              }
+            }
+          }
+          if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
+            // filter bad candidates
+            srcBlockList.add(block);
+          }
+        }
+      }
+      return bytesReceived;
+    }
+
+    /* Decide if the given block is a good candidate to move or not */
+    private boolean isGoodBlockCandidate(BalancerBlock block) {
+      for (NodeTask nodeTask : nodeTasks) {
+        if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /* Return a block that's good for the source thread to dispatch immediately
+     * The block's source, target, and proxy source are determined too.
+     * When choosing proxy and target, source & target throttling
+     * has been considered. They are chosen only when they have the capacity
+     * to support this block move.
+     * The block should be dispatched immediately after this method is returned.
+     */
+    private PendingBlockMove chooseNextBlockToMove() {
+      for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
+        NodeTask task = tasks.next();
+        BalancerDatanode target = task.getDatanode();
+        PendingBlockMove pendingBlock = new PendingBlockMove();
+        if ( target.addPendingBlock(pendingBlock) ) { 
+          // target is not busy, so do a tentative block allocation
+          pendingBlock.source = this;
+          pendingBlock.target = target;
+          if ( pendingBlock.chooseBlockAndProxy() ) {
+            long blockSize = pendingBlock.block.getNumBytes(); 
+            scheduledSize -= blockSize;
+            task.size -= blockSize;
+            if (task.size == 0) {
+              tasks.remove();
+            }
+            return pendingBlock;
+          } else {
+            // cancel the tentative move
+            target.removePendingBlock(pendingBlock);
+          }
+        }
+      }
+      return null;
+    }
+
+    /* iterate all source's blocks to remove moved ones */    
+    private void filterMovedBlocks() {
+      for (Iterator<BalancerBlock> blocks=getBlockIterator();
+            blocks.hasNext();) {
+        if (isMoved(blocks.next())) {
+          blocks.remove();
+        }
+      }
+    }
+    
+    private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
+    /* Return if should fetch more blocks from namenode */
+    private boolean shouldFetchMoreBlocks() {
+      return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
+                 blocksToReceive>0;
+    }
+    
+    /* This method iteratively does the following:
+     * it first selects a block to move,
+     * then sends a request to the proxy source to start the block move
+     * when the source's block list falls below a threshold, it asks
+     * the namenode for more blocks.
+     * It terminates when it has dispatch enough block move tasks or
+     * it has received enough blocks from the namenode, or 
+     * the elapsed time of the iteration has exceeded the max time limit.
+     */ 
+    private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
+    private void dispatchBlocks() {
+      long startTime = FSNamesystem.now();
+      this.blocksToReceive = 2*scheduledSize;
+      boolean isTimeUp = false;
+      while(!isTimeUp && scheduledSize>0 &&
+          (!srcBlockList.isEmpty() || blocksToReceive>0)) {
+        PendingBlockMove pendingBlock = chooseNextBlockToMove();
+        if (pendingBlock != null) {
+          // move the block
+          pendingBlock.scheduleBlockMove();
+          continue;
+        }
+        
+        /* Since we can not schedule any block to move,
+         * filter any moved blocks from the source block list and
+         * check if we should fetch more blocks from the namenode
+         */
+        filterMovedBlocks(); // filter already moved blocks
+        if (shouldFetchMoreBlocks()) {
+          // fetch new blocks
+          try {
+            blocksToReceive -= getBlockList();
+            continue;
+          } catch (IOException e) {
+            LOG.warn(StringUtils.stringifyException(e));
+            return;
+          }
+        } 
+        
+        // check if time is up or not
+        if (FSNamesystem.now()-startTime > MAX_ITERATION_TIME) {
+          isTimeUp = true;
+          continue;
+        }
+        
+        /* Now we can not schedule any block to move and there are
+         * no new blocks added to the source block list, so we wait. 
+         */
+        try {
+          synchronized(Balancer.this) {
+            Balancer.this.wait(1000);  // wait for targets/sources to be idle
+          }
+        } catch (InterruptedException ignored) {
+        }
+      }
+    }
+  }
+  
+  /** Default constructor */
+  Balancer() {
+  }
+  
+  /** Construct a balancer from the given configuration */
+  Balancer(Configuration conf) {
+    setConf(conf);
+  } 
+
+  /** Construct a balancer from the given configuration and threshold */
+  Balancer(Configuration conf, double threshold) {
+    setConf(conf);
+    this.threshold = threshold;
+  }
+
+  /**
+   * Run a balancer
+   * @param args
+   */
+  public static void main(String[] args) {
+    try {
+      System.exit( ToolRunner.run(null, new Balancer(), args) );
+    } catch (Throwable e) {
+      LOG.error(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: java Balancer");
+    System.out.println("          [-threshold <threshold>]\t" 
+        +"percentage of disk capacity");
+  }
+
+  /* parse argument to get the threshold */
+  private double parseArgs(String[] args) {
+    double threshold=0;
+    int argsLen = (args == null) ? 0 : args.length;
+    if (argsLen==0) {
+      threshold = 10;
+    } else {
+      if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
+        printUsage();
+        throw new IllegalArgumentException(Arrays.toString(args));
+      } else {
+        try {
+          threshold = Double.parseDouble(args[1]);
+          if (threshold < 0 || threshold >100) {
+            throw new NumberFormatException();
+          }
+          LOG.info( "Using a threshold of " + threshold );
+        } catch(NumberFormatException e) {
+          System.err.println(
+              "Expect a double parameter in the range of [0, 100]: "+ args[1]);
+          printUsage();
+          throw e;
+        }
+      }
+    }
+    return threshold;
+  }
+  
+  /* Initialize balancer. It sets the value of the threshold, and 
+   * builds the communication proxies to
+   * namenode as a client and a secondary namenode and retry proxies
+   * when connection fails.
+   */
+  private void init(double threshold) throws IOException {
+    this.threshold = threshold;
+    // get name node address 
+    InetSocketAddress nameNodeAddr = DataNode.createSocketAddr(
+        conf.get("fs.default.name", "local"));
+    // connect to name node
+    this.namenode = createNamenode(nameNodeAddr, conf);
+    this.client = DFSClient.createNamenode(nameNodeAddr, conf);
+    this.fs = FileSystem.get(conf);
+  }
+  
+  /* Build a NamenodeProtocol connection to the namenode and
+   * set up the retry policy */ 
+  private static NamenodeProtocol createNamenode(
+      InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
+    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+        5, 200, TimeUnit.MILLISECONDS);
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        timeoutPolicy, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap =
+        new HashMap<String, RetryPolicy>();
+    methodNameToPolicyMap.put("getBlocks", methodPolicy);
+
+    return (NamenodeProtocol) RetryProxy.create(
+        NamenodeProtocol.class,
+        RPC.getProxy(NamenodeProtocol.class,
+                         NamenodeProtocol.versionID,
+                         nameNodeAddr, 
+                         conf),
+        methodNameToPolicyMap);
+  }
+  
+  /* Shuffle datanode array */
+  static private void shuffleArray(DatanodeInfo[] datanodes) {
+    for (int i=datanodes.length; i>1; i--) {
+      int randomIndex = rnd.nextInt(i);
+      DatanodeInfo tmp = datanodes[randomIndex];
+      datanodes[randomIndex] = datanodes[i-1];
+      datanodes[i-1] = tmp;
+    }
+  }
+  
+  /* get all live datanodes of a cluster and their disk usage
+   * decide the number of bytes need to be moved
+   */
+  private long initNodes() throws IOException {
+    return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
+  }
+  
+  /* Given a data node set, build a network topology and decide
+   * over-utilized datanodes, above average utilized datanodes, 
+   * below average utilized datanodes, and underutilized datanodes. 
+   * The input data node set is shuffled before the datanodes 
+   * are put into the over-utilized datanodes, above average utilized
+   * datanodes, below average utilized datanodes, and
+   * underutilized datanodes lists. This will add some randomness
+   * to the node matching later on.
+   * 
+   * @return the total number of bytes that are 
+   *                needed to move to make the cluster balanced.
+   * @param datanodes a set of datanodes
+   */
+  private long initNodes(DatanodeInfo[] datanodes) {
+    // compute average utilization
+    long totalCapacity=0L, totalUsedSpace=0L;
+    for (DatanodeInfo datanode : datanodes) {
+      totalCapacity += datanode.getCapacity();
+      totalUsedSpace += datanode.getDfsUsed();
+    }
+    this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+
+    /*create network topology and all data node lists: 
+     * overloaded, above-average, below-average, and underloaded
+     * we alternates the accessing of the given datanodes array either by
+     * an increasing order or a decreasing order.
+     */  
+    long overLoadedBytes = 0L, underLoadedBytes = 0L;
+    shuffleArray(datanodes);
+    for (DatanodeInfo datanode : datanodes) {
+      cluster.add(datanode);
+      BalancerDatanode datanodeS;
+      if (getUtilization(datanode) > avgUtilization) {
+        datanodeS = new Source(datanode, avgUtilization, threshold);
+        if (isAboveAvgUtilized(datanodeS)) {
+          this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
+        } else {
+          assert(isOverUtilized(datanodeS)) :
+            datanodeS.getName()+ "is not an overUtilized node";
+          this.overUtilizedDatanodes.add((Source)datanodeS);
+          overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+              -threshold)*datanodeS.datanode.getCapacity()/100.0);
+        }
+      } else {
+        datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+        if ( isBelowAvgUtilized(datanodeS)) {
+          this.belowAvgUtilizedDatanodes.add(datanodeS);
+        } else {
+          assert (isUnderUtilized(datanodeS)) :
+            datanodeS.getName()+ "is not an underUtilized node"; 
+          this.underUtilizedDatanodes.add(datanodeS);
+          underLoadedBytes += (long)((avgUtilization-threshold-
+              datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+        }
+      }
+      this.datanodes.put(datanode.getStorageID(), datanodeS);
+    }
+
+    //logging
+    logImbalancedNodes();
+    
+    assert (this.datanodes.size() == 
+      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
+      : "Mismatched number of datanodes";
+    
+    // return number of bytes to be moved in order to make the cluster balanced
+    return Math.max(overLoadedBytes, underLoadedBytes);
+  }
+
+  /* log the over utilized & under utilized nodes */
+  private void logImbalancedNodes() {
+    StringBuilder msg = new StringBuilder();
+    msg.append(overUtilizedDatanodes.size());
+    msg.append(" over utilized nodes:");
+    for (Source node : overUtilizedDatanodes) {
+      msg.append( " " );
+      msg.append( node.getName() );
+    }
+    LOG.info(msg);
+    msg = new StringBuilder();
+    msg.append(underUtilizedDatanodes.size());
+    msg.append(" under utilized nodes: ");
+    for (BalancerDatanode node : underUtilizedDatanodes) {
+      msg.append( " " );
+      msg.append( node.getName() );
+    }
+    LOG.info(msg);
+  }
+  
+  /* Decide all <source, target> pairs and
+   * the number of bytes to move from a source to a target
+   * Maximum bytes to be moved per node is
+   * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
+   * Return total number of bytes to move in this iteration
+   */
+  private long chooseNodes() {
+    // Match nodes on the same rack first
+    chooseNodes(true);
+    // Then match nodes on different racks
+    chooseNodes(false);
+    
+    assert (datanodes.size() == 
+      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+
+      sources.size()+targets.size())
+      : "Mismatched number of datanodes";
+
+    long bytesToMove = 0L;
+    for (Source src : sources) {
+      bytesToMove += src.scheduledSize;
+    }
+    return bytesToMove;
+  }
+
+  /* if onRack is true, decide all <source, target> pairs
+   * where source and target are on the same rack; Otherwise
+   * decide all <source, target> pairs where source and target are
+   * on different racks
+   */
+  private void chooseNodes(boolean onRack) {
+    /* first step: match each overUtilized datanode (source) to
+     * one or more underUtilized datanodes (targets).
+     */
+    chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+    
+    /* match each remaining overutilized datanode (source) to 
+     * below average utilized datanodes (targets).
+     * Note only overutilized datanodes that haven't had that max bytes to move
+     * satisfied in step 1 are selected
+     */
+    chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+
+    /* match each remaining underutilized datanode to 
+     * above average utilized datanodes.
+     * Note only underutilized datanodes that have not had that max bytes to
+     * move satisfied in step 1 are selected.
+     */
+    chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+  }
+   
+  /* choose targets from the target candidate list for each over utilized
+   * source datanode. OnRackTarget determines if the chosen target 
+   * should be on the same rack as the source
+   */
+  private void chooseTargets(  
+      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
+    for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
+        srcIterator.hasNext();) {
+      Source source = srcIterator.next();
+      while (chooseTarget(source, targetCandidates, onRackTarget)) {
+      }
+      if (!source.isMoveQuotaFull()) {
+        srcIterator.remove();
+      }
+    }
+    return;
+  }
+  
+  /* choose sources from the source candidate list for each under utilized
+   * target datanode. onRackSource determines if the chosen source 
+   * should be on the same rack as the target
+   */
+  private void chooseSources(
+      Iterator<Source> sourceCandidates, boolean onRackSource) {
+    for (Iterator<BalancerDatanode> targetIterator = 
+      underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
+      BalancerDatanode target = targetIterator.next();
+      while (chooseSource(target, sourceCandidates, onRackSource)) {
+      }
+      if (!target.isMoveQuotaFull()) {
+        targetIterator.remove();
+      }
+    }
+    return;
+  }
+
+  /* For the given source, choose targets from the target candidate list.
+   * OnRackTarget determines if the chosen target 
+   * should be on the same rack as the source
+   */
+  private boolean chooseTarget(Source source,
+      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
+    if (!source.isMoveQuotaFull()) {
+      return false;
+    }
+    boolean foundTarget = false;
+    BalancerDatanode target = null;
+    while (!foundTarget && targetCandidates.hasNext()) {
+      target = targetCandidates.next();
+      if (!target.isMoveQuotaFull()) {
+        targetCandidates.remove();
+        continue;
+      }
+      if (onRackTarget) {
+        // choose from on-rack nodes
+        if (cluster.isOnSameRack(source.datanode, target.datanode)) {
+          foundTarget = true;
+        }
+      } else {
+        // choose from off-rack nodes
+        if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
+          foundTarget = true;
+        }
+      }
+    }
+    if (foundTarget) {
+      assert(target != null):"Choose a null target";
+      long size = Math.min(source.availableSizeToMove(),
+          target.availableSizeToMove());
+      NodeTask nodeTask = new NodeTask(target, size);
+      source.addNodeTask(nodeTask);
+      target.incScheduledSize(nodeTask.getSize());
+      sources.add(source);
+      targets.add(target);
+      if (!target.isMoveQuotaFull()) {
+        targetCandidates.remove();
+      }
+      LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from "
+          +source.datanode.getName() + " to " + target.datanode.getName());
+      return true;
+    }
+    return false;
+  }
+  
+  /* For the given target, choose sources from the source candidate list.
+   * OnRackSource determines if the chosen source 
+   * should be on the same rack as the target
+   */
+  private boolean chooseSource(BalancerDatanode target,
+      Iterator<Source> sourceCandidates, boolean onRackSource) {
+    if (!target.isMoveQuotaFull()) {
+      return false;
+    }
+    boolean foundSource = false;
+    Source source = null;
+    while (!foundSource && sourceCandidates.hasNext()) {
+      source = sourceCandidates.next();
+      if (!source.isMoveQuotaFull()) {
+        sourceCandidates.remove();
+        continue;
+      }
+      if (onRackSource) {
+        // choose from on-rack nodes
+        if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
+          foundSource = true;
+        }
+      } else {
+        // choose from off-rack nodes
+        if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
+          foundSource = true;
+        }
+      }
+    }
+    if (foundSource) {
+      assert(source != null):"Choose a null source";
+      long size = Math.min(source.availableSizeToMove(),
+          target.availableSizeToMove());
+      NodeTask nodeTask = new NodeTask(target, size);
+      source.addNodeTask(nodeTask);
+      target.incScheduledSize(nodeTask.getSize());
+      sources.add(source);
+      targets.add(target);
+      if ( !source.isMoveQuotaFull()) {
+        sourceCandidates.remove();
+      }
+      LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from "
+          +source.datanode.getName() + " to " + target.datanode.getName());
+      return true;
+    }
+    return false;
+  }
+
+  private static class BytesMoved {
+    private long bytesMoved = 0L;;
+    private synchronized void inc( long bytes ) {
+      bytesMoved += bytes;
+    }
+
+    private long get() {
+      return bytesMoved;
+    }
+  };
+  private BytesMoved bytesMoved = new BytesMoved();
+  private int notChangedIterations = 0;
+  
+  /* Start a thread to dispatch block moves for each source. 
+   * The thread selects blocks to move & sends request to proxy source to
+   * initiate block move. The process is flow controlled. Block selection is
+   * blocked if there are too many un-confirmed block moves.
+   * Return the total number of bytes successfully moved in this iteration.
+   */
+  private long dispatchBlockMoves() {
+    long bytesLastMoved = bytesMoved.get();
+    Source.BlockMoveDispatcher dispatchers[] =
+      new Source.BlockMoveDispatcher[sources.size()];
+    int i=0;
+    for (Source source : sources) {
+      dispatchers[i] = source.new BlockMoveDispatcher();
+      dispatchers[i].setName("Dispatcher for source " + source.getName());
+      LOG.info("Starting " + dispatchers[i].getName());
+      dispatchers[i++].start();
+    }
+    for (Source.BlockMoveDispatcher dispatcher : dispatchers) {
+      try {
+        dispatcher.join();
+      } catch (InterruptedException e) {
+        LOG.info(StringUtils.stringifyException(e));
+      }
+    }
+    waitForMoveCompletion();
+    return bytesMoved.get()-bytesLastMoved;
+  }
+  
+  // The sleeping period before checking if block move is completed again
+  static private long blockMoveWaitTime = 30000L;
+  
+  /** set the sleeping period for block move completion check */
+  static void setBlockMoveWaitTime(long time) {
+    blockMoveWaitTime = time;
+  }
+  
+  /* wait for all block move confirmations 
+   * by checking each target's pendingMove queue 
+   */
+  private void waitForMoveCompletion() {
+    boolean shouldWait;
+    do {
+      shouldWait = false;
+      for (BalancerDatanode target : targets) {
+        if (!target.isPendingQEmpty()) {
+          shouldWait = true;
+        }
+      }
+      if (shouldWait) {
+        try {
+          Thread.sleep(blockMoveWaitTime);
+        } catch (InterruptedException ignored) {
+        }
+      }
+    } while (shouldWait);
+  }
+
+  /* mark a block to be moved */
+  private void addToMoved(BalancerBlock block) {
+    synchronized(movedBlocks) {
+      movedBlocks.put(block.getBlock(), block);
+    }
+  }
+  
+  /* check if a block is marked as moved */
+  private boolean isMoved(BalancerBlock block) {
+    synchronized(movedBlocks) {
+      return movedBlocks.containsKey(block.getBlock());
+    }
+  }
+  
+  /* Decide if it is OK to move the given block from source to target
+   * A block is a good candidate if
+   * 1. the block is not in the process of being moved/has not been moved;
+   * 2. the block does not have a replica on the target;
+   * 3. doing the move does not reduce the number of racks that the block has
+   */
+  private boolean isGoodBlockCandidate(Source source, 
+      BalancerDatanode target, BalancerBlock block) {
+    // check if the block is moved or not
+    if (isMoved(block)) {
+        return false;
+    }
+    if (block.isLocatedOnDatanode(target)) {
+      return false;
+    }
+
+    boolean goodBlock = false;
+    if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
+      // good if source and target are on the same rack
+      goodBlock = true;
+    } else {
+      boolean notOnSameRack = true;
+      synchronized (block) {
+        for (BalancerDatanode loc : block.locations) {
+          if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
+            notOnSameRack = false;
+            break;
+          }
+        }
+      }
+      if (notOnSameRack) {
+        // good if target is target is not on the same rack as any replica
+        goodBlock = true;
+      } else {
+        // good if source is on the same rack as on of the replicas
+        for (BalancerDatanode loc : block.locations) {
+          if (loc != source && 
+              cluster.isOnSameRack(loc.datanode, source.datanode)) {
+            goodBlock = true;
+            break;
+          }
+        }
+      }
+    }
+    return goodBlock;
+  }
+  
+  /* reset all fields in a balancer preparing for the next iteration */
+  private void resetData() {
+    this.cluster = new NetworkTopology();
+    this.overUtilizedDatanodes.clear();
+    this.aboveAvgUtilizedDatanodes.clear();
+    this.belowAvgUtilizedDatanodes.clear();
+    this.underUtilizedDatanodes.clear();
+    this.datanodes.clear();
+    this.sources.clear();
+    this.targets.clear();  
+    this.avgUtilization = 0.0D;
+    cleanGlobalBlockList();
+  }
+  
+  /* Remove all blocks from the global block list except for the ones in the
+   * moved list.
+   */
+  private void cleanGlobalBlockList() {
+    for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
+    globalBlockListIterator.hasNext();) {
+      Block block = globalBlockListIterator.next();
+      if(!movedBlocks.containsKey(block)) {
+        globalBlockListIterator.remove();
+      }
+    }
+  }
+  
+  /* Return true if the given datanode is overUtilized */
+  private boolean isOverUtilized(BalancerDatanode datanode) {
+    return datanode.utilization > (avgUtilization+threshold);
+  }
+  
+  /* Return true if the given datanode is above average utilized
+   * but not overUtilized */
+  private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
+    return (datanode.utilization <= (avgUtilization+threshold))
+        && (datanode.utilization > avgUtilization);
+  }
+  
+  /* Return true if the given datanode is underUtilized */
+  private boolean isUnderUtilized(BalancerDatanode datanode) {
+    return datanode.utilization < (avgUtilization-threshold);
+  }
+
+  /* Return true if the given datanode is below average utilized 
+   * but not underUtilized */
+  private boolean isBelowAvgUtilized(BalancerDatanode datanode) {
+        return (datanode.utilization >= (avgUtilization-threshold))
+                 && (datanode.utilization < avgUtilization);
+  }
+
+  // Exit status
+  final public static int SUCCESS = 1;
+  final public static int ALREADY_RUNNING = -1;
+  final public static int NO_MOVE_BLOCK = -2;
+  final public static int NO_MOVE_PROGRESS = -3;
+  final public static int IO_EXCEPTION = -4;
+  final public static int ILLEGAL_ARGS = -5;
+  /** main method of Balancer
+   * @param args arguments to a Balancer
+   * @exception any exception occurs during datanode balancing
+   */
+  public int run(String[] args) throws Exception {
+    long startTime = FSNamesystem.now();
+    OutputStream out = null;
+    try {
+      // initialize a balancer
+      init(parseArgs(args));
+      
+      /* Check if there is another balancer running.
+       * Exit if there is another one running.
+       */
+      out = checkAndMarkRunningBalancer(); 
+      if (out == null) {
+        System.out.println("Another balancer is running. Exiting...");
+        return ALREADY_RUNNING;
+      }
+
+      Formatter formatter = new Formatter(System.out);
+      System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
+      int iterations = 0;
+      while (true ) {
+        /* get all live datanodes of a cluster and their disk usage
+         * decide the number of bytes need to be moved
+         */
+        long bytesLeftToMove = initNodes();
+        if (bytesLeftToMove == 0) {
+          System.out.println("The cluster is balanced. Exiting...");
+          return SUCCESS;
+        } else {
+          LOG.info( "Need to move "+ FsShell.byteDesc(bytesLeftToMove)
+              +" bytes to make the cluster balanced." );
+        }
+        
+        /* Decide all the nodes that will participate in the block move and
+         * the number of bytes that need to be moved from one node to another
+         * in this iteration. Maximum bytes to be moved per node is
+         * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
+         */
+        long bytesToMove = chooseNodes();
+        if (bytesToMove == 0) {
+          System.out.println("No block can be moved. Exiting...");
+          return NO_MOVE_BLOCK;
+        } else {
+          LOG.info( "Will move " + FsShell.byteDesc(bytesToMove) +
+              "bytes in this iteration");
+        }
+   
+        formatter.format("%-24s %10d  %19s  %18s  %17s\n", 
+            DateFormat.getDateTimeInstance().format(new Date()),
+            iterations,
+            FsShell.byteDesc(bytesMoved.get()),
+            FsShell.byteDesc(bytesLeftToMove),
+            FsShell.byteDesc(bytesToMove)
+            );
+        
+        /* For each pair of <source, target>, start a thread that repeatedly 
+         * decide a block to be moved and its proxy source, 
+         * then initiates the move until all bytes are moved or no more block
+         * available to move.
+         * Exit no byte has been moved for 5 consecutive iterations.
+         */
+        if (dispatchBlockMoves() > 0) {
+          notChangedIterations = 0;
+        } else {
+          notChangedIterations++;
+          if (notChangedIterations >= 5) {
+            System.out.println(
+                "No block has been moved for 5 iterations. Exiting...");
+            return NO_MOVE_PROGRESS;
+          }
+        }
+
+        // clean all lists
+        resetData();
+        
+        try {
+          Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
+        } catch (InterruptedException ignored) {
+        }
+        
+        iterations++;
+      }
+    } catch (IllegalArgumentException ae) {
+      return ILLEGAL_ARGS;
+    } catch (IOException e) {
+      System.out.println("Received an IO exception: " + e.getMessage() +
+          " . Exiting...");
+      return IO_EXCEPTION;
+    } finally {
+      IOUtils.closeStream(out); 
+      try {
+        fs.delete(BALANCER_ID_PATH);
+      } catch(IOException ignored) {
+      }
+      System.out.println("Balancing took " + 
+          time2Str(FSNamesystem.now()-startTime));
+    }
+  }
+
+  private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+  /* The idea for making sure that there is no more than one balancer
+   * running in an HDFS is to create a file in the HDFS, writes the IP address
+   * of the machine on which the balancer is running to the file, but did not
+   * close the file until the balancer exits. 
+   * This prevents the second balancer from running because it can not
+   * creates the file while the first one is running.
+   * 
+   * This method checks if there is any running balancer and 
+   * if no, mark yes if no.
+   * Note that this is an atomic operation.
+   * 
+   * Return null if there is a running balancer; otherwise the output stream
+   * to the newly created file.
+   */
+  private OutputStream checkAndMarkRunningBalancer() throws IOException {
+    try {
+      DataOutputStream out = fs.create(BALANCER_ID_PATH);
+      out. writeBytes(InetAddress.getLocalHost().getHostName());
+      out.flush();
+      return out;
+    } catch(RemoteException e) {
+      if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
+        return null;
+      } else {
+        throw e;
+      }
+    }
+  }
+  
+  /* Given elaspedTime in ms, return a printable string */
+  private static String time2Str(long elapsedTime) {
+    String unit;
+    double time = elapsedTime;
+    if (elapsedTime < 1000) {
+      unit = "milliseconds";
+    } else if (elapsedTime < 60*1000) {
+      unit = "seconds";
+      time = time/1000;
+    } else if (elapsedTime < 3600*1000) {
+      unit = "minutes";
+      time = time/(60*1000);
+    } else {
+      unit = "hours";
+      time = time/(3600*1000);
+    }
+
+    return time+" "+unit;
+  }
+
+  /** return this balancer's configuration */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /** set this balancer's configuration */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Dec  5 11:49:16 2007
@@ -75,7 +75,7 @@
   private TreeMap<String, OutputStream> pendingCreates =
     new TreeMap<String, OutputStream>();
     
-  private static ClientProtocol createNamenode(
+  static ClientProtocol createNamenode(
       InetSocketAddress nameNodeAddr, Configuration conf)
     throws IOException {
     RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Dec  5 11:49:16 2007
@@ -115,10 +115,10 @@
   int defaultBytesPerChecksum = 512;
 
   // The following three fields are to support balancing
-  final private static long BALANCE_BANDWIDTH = 1024L*1024; // 1MB/s
-  final private static short MAX_BALANCING_THREADS = 5;
+  final static short MAX_BALANCING_THREADS = 5;
   private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
-  private Throttler balancingThrottler = new Throttler(BALANCE_BANDWIDTH);
+  long balanceBandwidth;
+  private Throttler balancingThrottler;
 
   private static class DataNodeMetrics implements Updater {
     private final MetricsRecord metricsRecord;
@@ -288,6 +288,11 @@
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
     DataNode.nameNodeAddr = nameNodeAddr;
 
+    //set up parameter for cluster balancing
+    this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+    LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
+    this.balancingThrottler = new Throttler(balanceBandwidth);
+
     //create a servlet to serve full-file content
     String infoAddr = conf.get("dfs.datanode.http.bindAddress", "0.0.0.0:50075");
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
@@ -554,8 +559,10 @@
         synchronized(receivedBlockList) {
           synchronized(delHints) {
             int numBlocks = receivedBlockList.size();
-            if (receivedBlockList.size() > 0) {
-              assert(numBlocks==delHints.size());
+            if (numBlocks > 0) {
+              if(numBlocks!=delHints.size()) {
+                LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
+              }
               //
               // Send newly-received blockids to namenode
               //
@@ -565,6 +572,9 @@
           }
         }
         if (blockArray != null) {
+          if(delHintArray == null || delHintArray.length != blockArray.length ) {
+            LOG.warn("Panic: block array & delHintArray are not the same" );
+          }
           namenode.blockReceived(dnRegistration, blockArray, delHintArray);
           synchronized (receivedBlockList) {
             synchronized (delHints) {
@@ -753,6 +763,9 @@
    * client? For now we don't.
    */
   private void notifyNamenodeReceivedBlock(Block block, String delHint) {
+    if(block==null || delHint==null) {
+      throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
+    }
     synchronized (receivedBlockList) {
       synchronized (delHints) {
         receivedBlockList.add(block);
@@ -1149,7 +1162,7 @@
         // notify name node
         notifyNamenodeReceivedBlock(block, sourceID);
 
-        LOG.info("Received block " + block + 
+        LOG.info("Moved block " + block + 
             " from " + s.getRemoteSocketAddress());
       } catch (IOException ioe) {
         opStatus = OP_STATUS_ERROR;
@@ -1181,7 +1194,7 @@
 
     /** Constructor */
     Throttler(long bandwidthPerSec) {
-      this(1000, bandwidthPerSec);  // by default throttling period is 1s
+      this(500, bandwidthPerSec);  // by default throttling period is 500ms 
     }
 
     /** Constructor */

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Wed Dec  5 11:49:16 2007
@@ -234,6 +234,13 @@
           + "] is less than the number of datanodes [" + numDataNodes + "].");
     }
 
+    if (simulatedCapacities != null 
+        && numDataNodes > simulatedCapacities.length) {
+      throw new IllegalArgumentException( "The length of simulatedCapacities [" 
+          + simulatedCapacities.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+
     // Set up the right ports for the datanodes
     conf.set("dfs.datanode.bindAddress", "0.0.0.0:0");
     conf.set("dfs.datanode.http.bindAddress", "0.0.0.0:0");
@@ -263,9 +270,8 @@
       }
       if (simulatedCapacities != null) {
         dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
-      }
-      if (simulatedCapacities != null && i < simulatedCapacities.length) {
-        dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, simulatedCapacities[i]);
+        dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
+            simulatedCapacities[i-curDatanodesNum]);
       }
       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
                          + dnConf.get("dfs.data.dir"));

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java Wed Dec  5 11:49:16 2007
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancer extends TestCase {
+  private static final Configuration CONF = new Configuration();
+  final private static long CAPACITY = 500L;
+  final private static String RACK0 = "/rack0";
+  final private static String RACK1 = "/rack1";
+  final private static String RACK2 = "/rack2";
+  final static private String fileName = "/tmp.txt";
+  final static private Path filePath = new Path(fileName);
+  private MiniDFSCluster cluster;
+
+  ClientProtocol client;
+
+  static final int DEFAULT_BLOCK_SIZE = 10;
+  private Balancer balancer;
+  private Random r = new Random();
+
+  static {
+    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+    CONF.setLong("dfs.heartbeat.interval", 1L);
+    CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    Balancer.setBlockMoveWaitTime(1000L) ;
+  }
+
+  /* create a file with a length of <code>fileLen</code> */
+  private void createFile(long fileLen, short replicationFactor)
+  throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, filePath, fileLen, 
+        replicationFactor, r.nextLong());
+    DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+  }
+
+
+  /* fill up a cluster with <code>numNodes</code> datanodes 
+   * whose used space to be <code>size</code>
+   */
+  private Block[] generateBlocks(long size, short numNodes) throws IOException {
+    cluster = new MiniDFSCluster( CONF, numNodes, true, null);
+    try {
+      cluster.waitActive();
+      client = DFSClient.createNamenode(
+          DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+      short replicationFactor = (short)(numNodes-1);
+      long fileLen = size/replicationFactor;
+      createFile(fileLen, replicationFactor);
+
+      List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+      getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
+
+      int numOfBlocks = locatedBlocks.size();
+      Block[] blocks = new Block[numOfBlocks];
+      for(int i=0; i<numOfBlocks; i++) {
+        Block b = locatedBlocks.get(i).getBlock();
+        blocks[i] = new Block(b.getBlockId(), b.getNumBytes());
+      }
+
+      return blocks;
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /* Distribute all blocks according to the given distribution */
+  Block[][] distributeBlocks(Block[] blocks, short replicationFactor, 
+      final long[] distribution ) {
+    // make a copy
+    long[] usedSpace = new long[distribution.length];
+    System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
+
+    List<List<Block>> blockReports = 
+      new ArrayList<List<Block>>(usedSpace.length);
+    Block[][] results = new Block[usedSpace.length][];
+    for(int i=0; i<usedSpace.length; i++) {
+      blockReports.add(new ArrayList<Block>());
+    }
+    for(int i=0; i<blocks.length; i++) {
+      for(int j=0; j<replicationFactor; j++) {
+        boolean notChosen = true;
+        while(notChosen) {
+          int chosenIndex = r.nextInt(usedSpace.length);
+          if( usedSpace[chosenIndex]>0 ) {
+            notChosen = false;
+            blockReports.get(chosenIndex).add(blocks[i]);
+            usedSpace[chosenIndex] -= blocks[i].getNumBytes();
+          }
+        }
+      }
+    }
+    for(int i=0; i<usedSpace.length; i++) {
+      List<Block> nodeBlockList = blockReports.get(i);
+      results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
+    }
+    return results;
+  }
+
+  /* we first start a cluster and fill the cluster up to a certain size.
+   * then redistribute blocks according the required distribution.
+   * Afterwards a balancer is running to balance the cluster.
+   */
+  private void testUnevenDistribution(
+      long distribution[], long capacities[], String[] racks) throws Exception {
+    int numDatanodes = distribution.length;
+    if (capacities.length != numDatanodes || racks.length != numDatanodes) {
+      throw new IllegalArgumentException("Array length is not the same");
+    }
+
+    // calculate total space that need to be filled
+    long totalUsedSpace=0L;
+    for(int i=0; i<distribution.length; i++) {
+      totalUsedSpace += distribution[i];
+    }
+
+    // fill the cluster
+    Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
+
+    // redistribute blocks
+    Block[][] blocksDN = distributeBlocks(
+        blocks, (short)(numDatanodes-1), distribution);
+
+    // restart the cluster: do NOT format the cluster
+    CONF.set("dfs.safemode.threshold.pct", "0.0f"); 
+    cluster = new MiniDFSCluster(0, CONF, numDatanodes,
+        false, true, null, racks, capacities);
+    cluster.waitActive();
+    client = DFSClient.createNamenode(
+        DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+    cluster.injectBlocks(blocksDN);
+
+    long totalCapacity = 0L;
+    for(long capacity:capacities) {
+      totalCapacity += capacity;
+    }
+    runBalancer(totalUsedSpace, totalCapacity);
+  }
+
+  /* wait for one heartbeat */
+  private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
+  throws IOException {
+    long[] status = client.getStats();
+    while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
+      try {
+        Thread.sleep(100L);
+      } catch(InterruptedException ignored) {
+      }
+      status = client.getStats();
+    }
+  }
+
+  /* This test start a one-node cluster, fill the node to be 30% full;
+   * It then adds an empty node and start balancing.
+   * @param newCapacity new node's capacity
+   * @param new 
+   */
+  private void test(long[] capacities, String[] racks, 
+      long newCapacity, String newRack) throws Exception {
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null, 
+        racks, capacities);
+    try {
+      cluster.waitActive();
+      client = DFSClient.createNamenode(
+          DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+      long totalCapacity=0L;
+      for(long capacity:capacities) {
+        totalCapacity += capacity;
+      }
+      // fill up the cluster to be 30% full
+      long totalUsedSpace = totalCapacity*3/10;
+      createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(CONF, 1, true, null,
+          new String[]{newRack}, new long[]{newCapacity});
+
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      runBalancer(totalUsedSpace, totalCapacity);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /* Start balancer and check if the cluster is balanced after the run */
+  private void runBalancer( long totalUsedSpace, long totalCapacity )
+  throws Exception {
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+    // start rebalancing
+    balancer = new Balancer(CONF);
+    balancer.run(new String[0]);
+
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+    boolean balanced;
+    do {
+      DatanodeInfo[] datanodeReport = 
+        client.getDatanodeReport(DatanodeReportType.ALL);
+      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+      balanced = true;
+      double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+      for(DatanodeInfo datanode:datanodeReport) {
+        if(Math.abs(avgUtilization-
+            ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
+          balanced = false;
+          try {
+            Thread.sleep(100);
+          } catch(InterruptedException ignored) {
+          }
+          break;
+        }
+      }
+    } while(!balanced);
+
+  }
+  /** Test a cluster with even distribution, 
+   * then a new empty node is added to the cluster*/
+  public void testBalancer0() throws Exception {
+    /** one-node cluster test*/
+    // add an empty node with half of the CAPACITY & the same rack
+    test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+
+    /** two-node cluster test */
+    test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2);
+  }
+
+  /** Test unevenly distributed cluster */
+  public void testBalancer1() throws Exception {
+    testUnevenDistribution(
+        new long[] {50*CAPACITY/100, 10*CAPACITY/100},
+        new long[]{CAPACITY, CAPACITY},
+        new String[] {RACK0, RACK1});
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    TestBalancer balancerTest = new TestBalancer();
+    balancerTest.testBalancer0();
+    balancerTest.testBalancer1();
+  }
+}

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL