You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/08/08 23:33:57 UTC

svn commit: r1616889 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/test/java/org/apache/hadoop/hdfs/server/balancer/

Author: jing9
Date: Fri Aug  8 21:33:57 2014
New Revision: 1616889

URL: http://svn.apache.org/r1616889
Log:
HDFS-6828. Separate block replica dispatching from Balancer. Contributed by Tsz Wo Nicholas Sze.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616889&r1=1616888&r2=1616889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug  8 21:33:57 2014
@@ -384,6 +384,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-573. Porting libhdfs to Windows. (cnauroth)
 
+    HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
+    jing9)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1616889&r1=1616888&r2=1616889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug  8 21:33:57 2014
@@ -18,19 +18,9 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintStream;
-import java.net.Socket;
 import java.net.URI;
 import java.text.DateFormat;
 import java.util.ArrayList;
@@ -38,20 +28,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
-import java.util.EnumMap;
 import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -63,31 +44,15 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -200,15 +165,7 @@ public class Balancer {
 
   private static final long GB = 1L << 30; //1GB
   private static final long MAX_SIZE_TO_MOVE = 10*GB;
-  private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
 
-  /** The maximum number of concurrent blocks moves for 
-   * balancing purpose at a datanode
-   */
-  private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
-  public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
-  public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
-  
   private static final String USAGE = "Usage: java "
       + Balancer.class.getSimpleName()
       + "\n\t[-policy <policy>]\tthe balancing policy: "
@@ -220,16 +177,9 @@ public class Balancer {
       + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
       + "\tIncludes only the specified datanodes.";
   
-  private final NameNodeConnector nnc;
-  private final KeyManager keyManager;
-
+  private final Dispatcher dispatcher;
   private final BalancingPolicy policy;
-  private final SaslDataTransferClient saslClient;
   private final double threshold;
-  // set of data nodes to be excluded from balancing operations.
-  Set<String> nodesToBeExcluded;
-  //Restrict balancing to the following nodes.
-  Set<String> nodesToBeIncluded;
   
   // all data node lists
   private final Collection<Source> overUtilized = new LinkedList<Source>();
@@ -238,634 +188,6 @@ public class Balancer {
       = new LinkedList<BalancerDatanode.StorageGroup>();
   private final Collection<BalancerDatanode.StorageGroup> underUtilized
       = new LinkedList<BalancerDatanode.StorageGroup>();
-  
-  private final Collection<Source> sources = new HashSet<Source>();
-  private final Collection<BalancerDatanode.StorageGroup> targets
-      = new HashSet<BalancerDatanode.StorageGroup>();
-  
-  private final Map<Block, BalancerBlock> globalBlockList
-                 = new HashMap<Block, BalancerBlock>();
-  private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
-
-  /** Map (datanodeUuid,storageType -> StorageGroup) */
-  private final StorageGroupMap storageGroupMap = new StorageGroupMap();
-  
-  private NetworkTopology cluster;
-
-  private final ExecutorService moverExecutor;
-  private final ExecutorService dispatcherExecutor;
-  private final int maxConcurrentMovesPerNode;
-
-
-  private static class StorageGroupMap {
-    private static String toKey(String datanodeUuid, StorageType storageType) {
-      return datanodeUuid + ":" + storageType;
-    }
-
-    private final Map<String, BalancerDatanode.StorageGroup> map
-        = new HashMap<String, BalancerDatanode.StorageGroup>();
-    
-    BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) {
-      return map.get(toKey(datanodeUuid, storageType));
-    }
-    
-    void put(BalancerDatanode.StorageGroup g) {
-      final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
-      final BalancerDatanode.StorageGroup existing = map.put(key, g);
-      Preconditions.checkState(existing == null);
-    }
-    
-    int size() {
-      return map.size();
-    }
-    
-    void clear() {
-      map.clear();
-    }
-  }
-  /* This class keeps track of a scheduled block move */
-  private class PendingBlockMove {
-    private BalancerBlock block;
-    private Source source;
-    private BalancerDatanode proxySource;
-    private BalancerDatanode.StorageGroup target;
-    
-    /** constructor */
-    private PendingBlockMove() {
-    }
-    
-    @Override
-    public String toString() {
-      final Block b = block.getBlock();
-      return b + " with size=" + b.getNumBytes() + " from "
-          + source.getDisplayName() + " to " + target.getDisplayName()
-          + " through " + proxySource.datanode;
-    }
-
-    /* choose a block & a proxy source for this pendingMove 
-     * whose source & target have already been chosen.
-     * 
-     * Return true if a block and its proxy are chosen; false otherwise
-     */
-    private boolean chooseBlockAndProxy() {
-      // iterate all source's blocks until find a good one
-      for (Iterator<BalancerBlock> blocks=
-        source.getBlockIterator(); blocks.hasNext();) {
-        if (markMovedIfGoodBlock(blocks.next())) {
-          blocks.remove();
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    /* Return true if the given block is good for the tentative move;
-     * If it is good, add it to the moved list to marked as "Moved".
-     * A block is good if
-     * 1. it is a good candidate; see isGoodBlockCandidate
-     * 2. can find a proxy source that's not busy for this move
-     */
-    private boolean markMovedIfGoodBlock(BalancerBlock block) {
-      synchronized(block) {
-        synchronized(movedBlocks) {
-          if (isGoodBlockCandidate(source, target, block)) {
-            this.block = block;
-            if ( chooseProxySource() ) {
-              movedBlocks.put(block);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Decided to move " + this);
-              }
-              return true;
-            }
-          }
-        }
-      }
-      return false;
-    }
-    
-    /* Now we find out source, target, and block, we need to find a proxy
-     * 
-     * @return true if a proxy is found; otherwise false
-     */
-    private boolean chooseProxySource() {
-      final DatanodeInfo targetDN = target.getDatanode();
-      // if node group is supported, first try add nodes in the same node group
-      if (cluster.isNodeGroupAware()) {
-        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
-          if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
-            return true;
-          }
-        }
-      }
-      // check if there is replica which is on the same rack with the target
-      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
-        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
-          return true;
-        }
-      }
-      // find out a non-busy replica
-      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
-        if (addTo(loc)) {
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    /** add to a proxy source for specific block movement */
-    private boolean addTo(BalancerDatanode.StorageGroup g) {
-      final BalancerDatanode bdn = g.getBalancerDatanode();
-      if (bdn.addPendingBlock(this)) {
-        proxySource = bdn;
-        return true;
-      }
-      return false;
-    }
-    
-    /* Dispatch the block move task to the proxy source & wait for the response
-     */
-    private void dispatch() {
-      Socket sock = new Socket();
-      DataOutputStream out = null;
-      DataInputStream in = null;
-      try {
-        sock.connect(
-            NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
-            HdfsServerConstants.READ_TIMEOUT);
-        /* Unfortunately we don't have a good way to know if the Datanode is
-         * taking a really long time to move a block, OR something has
-         * gone wrong and it's never going to finish. To deal with this 
-         * scenario, we set a long timeout (20 minutes) to avoid hanging
-         * the balancer indefinitely.
-         */
-        sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
-
-        sock.setKeepAlive(true);
-        
-        OutputStream unbufOut = sock.getOutputStream();
-        InputStream unbufIn = sock.getInputStream();
-        ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock());
-        Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, keyManager, accessToken, target.getDatanode());
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            HdfsConstants.IO_FILE_BUFFER_SIZE));
-        in = new DataInputStream(new BufferedInputStream(unbufIn,
-            HdfsConstants.IO_FILE_BUFFER_SIZE));
-        
-        sendRequest(out, eb, StorageType.DEFAULT, accessToken);
-        receiveResponse(in);
-        bytesMoved.addAndGet(block.getNumBytes());
-        LOG.info("Successfully moved " + this);
-      } catch (IOException e) {
-        LOG.warn("Failed to move " + this + ": " + e.getMessage());
-        /* proxy or target may have an issue, insert a small delay
-         * before using these nodes further. This avoids a potential storm
-         * of "threads quota exceeded" Warnings when the balancer
-         * gets out of sync with work going on in datanode.
-         */
-        proxySource.activateDelay(DELAY_AFTER_ERROR);
-        target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
-      } finally {
-        IOUtils.closeStream(out);
-        IOUtils.closeStream(in);
-        IOUtils.closeSocket(sock);
-        
-        proxySource.removePendingBlock(this);
-        target.getBalancerDatanode().removePendingBlock(this);
-
-        synchronized (this ) {
-          reset();
-        }
-        synchronized (Balancer.this) {
-          Balancer.this.notifyAll();
-        }
-      }
-    }
-    
-    /* Send a block replace request to the output stream*/
-    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
-        StorageType storageType, 
-        Token<BlockTokenIdentifier> accessToken) throws IOException {
-      new Sender(out).replaceBlock(eb, storageType, accessToken,
-          source.getDatanode().getDatanodeUuid(), proxySource.datanode);
-    }
-    
-    /* Receive a block copy response from the input stream */ 
-    private void receiveResponse(DataInputStream in) throws IOException {
-      BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
-          vintPrefixed(in));
-      if (response.getStatus() != Status.SUCCESS) {
-        if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
-          throw new IOException("block move failed due to access token error");
-        throw new IOException("block move is failed: " +
-            response.getMessage());
-      }
-    }
-
-    /* reset the object */
-    private void reset() {
-      block = null;
-      source = null;
-      proxySource = null;
-      target = null;
-    }
-    
-    /* start a thread to dispatch the block move */
-    private void scheduleBlockMove() {
-      moverExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Start moving " + PendingBlockMove.this);
-          }
-          dispatch();
-        }
-      });
-    }
-  }
-  
-  /* A class for keeping track of blocks in the Balancer */
-  static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
-    BalancerBlock(Block block) {
-      super(block);
-    }
-  }
-  
-  /* The class represents a desired move of bytes between two nodes 
-   * and the target.
-   * An object of this class is stored in a source. 
-   */
-  static private class Task {
-    private final BalancerDatanode.StorageGroup target;
-    private long size;  //bytes scheduled to move
-    
-    /* constructor */
-    private Task(BalancerDatanode.StorageGroup target, long size) {
-      this.target = target;
-      this.size = size;
-    }
-  }
-  
-  
-  /* A class that keeps track of a datanode in Balancer */
-  private static class BalancerDatanode {
-    
-    /** A group of storages in a datanode with the same storage type. */
-    private class StorageGroup {
-      final StorageType storageType;
-      final double utilization;
-      final long maxSize2Move;
-      private long scheduledSize = 0L;
-
-      private StorageGroup(StorageType storageType, double utilization,
-          long maxSize2Move) {
-        this.storageType = storageType;
-        this.utilization = utilization;
-        this.maxSize2Move = maxSize2Move;
-      }
-      
-      BalancerDatanode getBalancerDatanode() {
-        return BalancerDatanode.this;
-      }
-
-      DatanodeInfo getDatanode() {
-        return BalancerDatanode.this.datanode;
-      }
-
-      /** Decide if still need to move more bytes */
-      protected synchronized boolean hasSpaceForScheduling() {
-        return availableSizeToMove() > 0L;
-      }
-
-      /** @return the total number of bytes that need to be moved */
-      synchronized long availableSizeToMove() {
-        return maxSize2Move - scheduledSize;
-      }
-      
-      /** increment scheduled size */
-      synchronized void incScheduledSize(long size) {
-        scheduledSize += size;
-      }
-      
-      /** @return scheduled size */
-      synchronized long getScheduledSize() {
-        return scheduledSize;
-      }
-      
-      /** Reset scheduled size to zero. */
-      synchronized void resetScheduledSize() {
-        scheduledSize = 0L;
-      }
-
-      /** @return the name for display */
-      String getDisplayName() {
-        return datanode + ":" + storageType;
-      }
-      
-      @Override
-      public String toString() {
-        return "" + utilization;
-      }
-    }
-
-    final DatanodeInfo datanode;
-    final EnumMap<StorageType, StorageGroup> storageMap
-        = new EnumMap<StorageType, StorageGroup>(StorageType.class);
-    protected long delayUntil = 0L;
-    //  blocks being moved but not confirmed yet
-    private final List<PendingBlockMove> pendingBlocks;
-    private final int maxConcurrentMoves;
-    
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
-    }
-
-    /* Constructor 
-     * Depending on avgutil & threshold, calculate maximum bytes to move 
-     */
-    private BalancerDatanode(DatanodeStorageReport report,
-        double threshold, int maxConcurrentMoves) {
-      this.datanode = report.getDatanodeInfo();
-      this.maxConcurrentMoves = maxConcurrentMoves;
-      this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
-    }
-    
-    private void put(StorageType storageType, StorageGroup g) {
-      final StorageGroup existing = storageMap.put(storageType, g);
-      Preconditions.checkState(existing == null);
-    }
-
-    StorageGroup addStorageGroup(StorageType storageType, double utilization,
-        long maxSize2Move) {
-      final StorageGroup g = new StorageGroup(storageType, utilization,
-          maxSize2Move);
-      put(storageType, g);
-      return g;
-    }
-
-    Source addSource(StorageType storageType, double utilization,
-        long maxSize2Move, Balancer balancer) {
-      final Source s = balancer.new Source(storageType, utilization,
-          maxSize2Move, this);
-      put(storageType, s);
-      return s;
-    }
-
-    synchronized private void activateDelay(long delta) {
-      delayUntil = Time.now() + delta;
-    }
-
-    synchronized private boolean isDelayActive() {
-      if (delayUntil == 0 || Time.now() > delayUntil){
-        delayUntil = 0;
-        return false;
-      }
-        return true;
-    }
-    
-    /* Check if the node can schedule more blocks to move */
-    synchronized private boolean isPendingQNotFull() {
-      if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
-        return true;
-      }
-      return false;
-    }
-    
-    /* Check if all the dispatched moves are done */
-    synchronized private boolean isPendingQEmpty() {
-      return pendingBlocks.isEmpty();
-    }
-    
-    /* Add a scheduled block move to the node */
-    private synchronized boolean addPendingBlock(
-        PendingBlockMove pendingBlock) {
-      if (!isDelayActive() && isPendingQNotFull()) {
-        return pendingBlocks.add(pendingBlock);
-      }
-      return false;
-    }
-    
-    /* Remove a scheduled block move from the node */
-    private synchronized boolean  removePendingBlock(
-        PendingBlockMove pendingBlock) {
-      return pendingBlocks.remove(pendingBlock);
-    }
-  }
-
-  /** A node that can be the sources of a block move */
-  private class Source extends BalancerDatanode.StorageGroup {
-    
-    /* A thread that initiates a block move 
-     * and waits for block move to complete */
-    private class BlockMoveDispatcher implements Runnable {
-      @Override
-      public void run() {
-        dispatchBlocks();
-      }
-    }
-    
-    private final List<Task> tasks = new ArrayList<Task>(2);
-    private long blocksToReceive = 0L;
-    /* source blocks point to balancerBlocks in the global list because
-     * we want to keep one copy of a block in balancer and be aware that
-     * the locations are changing over time.
-     */
-    private final List<BalancerBlock> srcBlockList
-            = new ArrayList<BalancerBlock>();
-    
-    /* constructor */
-    private Source(StorageType storageType, double utilization,
-        long maxSize2Move, BalancerDatanode dn) {
-      dn.super(storageType, utilization, maxSize2Move);
-    }
-    
-    /** Add a task */
-    private void addTask(Task task) {
-      Preconditions.checkState(task.target != this,
-          "Source and target are the same storage group " + getDisplayName());
-      incScheduledSize(task.size);
-      tasks.add(task);
-    }
-    
-    /* Return an iterator to this source's blocks */
-    private Iterator<BalancerBlock> getBlockIterator() {
-      return srcBlockList.iterator();
-    }
-    
-    /* fetch new blocks of this source from namenode and
-     * update this source's block list & the global block list
-     * Return the total size of the received blocks in the number of bytes.
-     */
-    private long getBlockList() throws IOException {
-      final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
-      final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks(
-          getDatanode(), size).getBlocks();
-
-      long bytesReceived = 0;
-      for (BlockWithLocations blk : newBlocks) {
-        bytesReceived += blk.getBlock().getNumBytes();
-        BalancerBlock block;
-        synchronized(globalBlockList) {
-          block = globalBlockList.get(blk.getBlock());
-          if (block==null) {
-            block = new BalancerBlock(blk.getBlock());
-            globalBlockList.put(blk.getBlock(), block);
-          } else {
-            block.clearLocations();
-          }
-        
-          synchronized (block) {
-            // update locations
-            final String[] datanodeUuids = blk.getDatanodeUuids();
-            final StorageType[] storageTypes = blk.getStorageTypes();
-            for (int i = 0; i < datanodeUuids.length; i++) {
-              final BalancerDatanode.StorageGroup g = storageGroupMap.get(
-                  datanodeUuids[i], storageTypes[i]);
-              if (g != null) { // not unknown
-                block.addLocation(g);
-              }
-            }
-          }
-          if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
-            // filter bad candidates
-            srcBlockList.add(block);
-          }
-        }
-      }
-      return bytesReceived;
-    }
-
-    /* Decide if the given block is a good candidate to move or not */
-    private boolean isGoodBlockCandidate(BalancerBlock block) {
-      for (Task t : tasks) {
-        if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /* Return a block that's good for the source thread to dispatch immediately
-     * The block's source, target, and proxy source are determined too.
-     * When choosing proxy and target, source & target throttling
-     * has been considered. They are chosen only when they have the capacity
-     * to support this block move.
-     * The block should be dispatched immediately after this method is returned.
-     */
-    private PendingBlockMove chooseNextBlockToMove() {
-      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
-        final Task task = i.next();
-        final BalancerDatanode target = task.target.getBalancerDatanode();
-        PendingBlockMove pendingBlock = new PendingBlockMove();
-        if (target.addPendingBlock(pendingBlock)) { 
-          // target is not busy, so do a tentative block allocation
-          pendingBlock.source = this;
-          pendingBlock.target = task.target;
-          if ( pendingBlock.chooseBlockAndProxy() ) {
-            long blockSize = pendingBlock.block.getNumBytes();
-            incScheduledSize(-blockSize);
-            task.size -= blockSize;
-            if (task.size == 0) {
-              i.remove();
-            }
-            return pendingBlock;
-          } else {
-            // cancel the tentative move
-            target.removePendingBlock(pendingBlock);
-          }
-        }
-      }
-      return null;
-    }
-
-    /* iterate all source's blocks to remove moved ones */    
-    private void filterMovedBlocks() {
-      for (Iterator<BalancerBlock> blocks=getBlockIterator();
-            blocks.hasNext();) {
-        if (movedBlocks.contains(blocks.next().getBlock())) {
-          blocks.remove();
-        }
-      }
-    }
-    
-    private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
-    /* Return if should fetch more blocks from namenode */
-    private boolean shouldFetchMoreBlocks() {
-      return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
-                 blocksToReceive>0;
-    }
-    
-    /* This method iteratively does the following:
-     * it first selects a block to move,
-     * then sends a request to the proxy source to start the block move
-     * when the source's block list falls below a threshold, it asks
-     * the namenode for more blocks.
-     * It terminates when it has dispatch enough block move tasks or
-     * it has received enough blocks from the namenode, or 
-     * the elapsed time of the iteration has exceeded the max time limit.
-     */ 
-    private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
-    private void dispatchBlocks() {
-      long startTime = Time.now();
-      long scheduledSize = getScheduledSize();
-      this.blocksToReceive = 2*scheduledSize;
-      boolean isTimeUp = false;
-      int noPendingBlockIteration = 0;
-      while(!isTimeUp && getScheduledSize()>0 &&
-          (!srcBlockList.isEmpty() || blocksToReceive>0)) {
-        PendingBlockMove pendingBlock = chooseNextBlockToMove();
-        if (pendingBlock != null) {
-          // move the block
-          pendingBlock.scheduleBlockMove();
-          continue;
-        }
-        
-        /* Since we can not schedule any block to move,
-         * filter any moved blocks from the source block list and
-         * check if we should fetch more blocks from the namenode
-         */
-        filterMovedBlocks(); // filter already moved blocks
-        if (shouldFetchMoreBlocks()) {
-          // fetch new blocks
-          try {
-            blocksToReceive -= getBlockList();
-            continue;
-          } catch (IOException e) {
-            LOG.warn("Exception while getting block list", e);
-            return;
-          }
-        } else {
-          // source node cannot find a pendingBlockToMove, iteration +1
-          noPendingBlockIteration++;
-          // in case no blocks can be moved for source node's task,
-          // jump out of while-loop after 5 iterations.
-          if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
-            resetScheduledSize();
-          }
-        }
-        
-        // check if time is up or not
-        if (Time.now()-startTime > MAX_ITERATION_TIME) {
-          isTimeUp = true;
-          continue;
-        }
-        
-        /* Now we can not schedule any block to move and there are
-         * no new blocks added to the source block list, so we wait. 
-         */
-        try {
-          synchronized(Balancer.this) {
-            Balancer.this.wait(1000);  // wait for targets/sources to be idle
-          }
-        } catch (InterruptedException ignored) {
-        }
-      }
-    }
-  }
 
   /* Check that this Balancer is compatible with the Block Placement Policy
    * used by the Namenode.
@@ -887,38 +209,12 @@ public class Balancer {
    * when connection fails.
    */
   Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+    this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
+        p.nodesToBeExcluded, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
-    this.nodesToBeExcluded = p.nodesToBeExcluded;
-    this.nodesToBeIncluded = p.nodesToBeIncluded;
-    this.nnc = theblockpool;
-    this.keyManager = nnc.getKeyManager();
-    
-    final long movedWinWidth = conf.getLong(
-        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
-        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
-    movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
-
-    cluster = NetworkTopology.getInstance(conf);
-
-    this.moverExecutor = Executors.newFixedThreadPool(
-            conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
-                        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
-    this.dispatcherExecutor = Executors.newFixedThreadPool(
-            conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
-                        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
-    this.maxConcurrentMovesPerNode =
-        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
-        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
-    this.saslClient = new SaslDataTransferClient(
-      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-      TrustedChannelResolver.getInstance(conf),
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
-  
   private static long getCapacity(DatanodeStorageReport report, StorageType t) {
     long capacity = 0L;
     for(StorageReport r : report.getStorageReports()) {
@@ -939,26 +235,6 @@ public class Balancer {
     return remaining;
   }
 
-  private boolean shouldIgnore(DatanodeInfo dn) {
-    //ignore decommissioned nodes
-    final boolean decommissioned = dn.isDecommissioned();
-    //ignore decommissioning nodes
-    final boolean decommissioning = dn.isDecommissionInProgress();
-    // ignore nodes in exclude list
-    final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn);
-    // ignore nodes not in the include list (if include list is not empty)
-    final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn);
-    
-    if (decommissioned || decommissioning || excluded || notIncluded) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
-            + decommissioning + ", " + excluded + ", " + notIncluded);
-      }
-      return true;
-    }
-    return false;
-  }
-
   /**
    * Given a datanode storage set, build a network topology and decide
    * over-utilized storages, above average utilized storages, 
@@ -966,16 +242,11 @@ public class Balancer {
    * The input datanode storage set is shuffled in order to randomize
    * to the storage matching later on.
    *
-   * @return the total number of bytes that are 
-   *                needed to move to make the cluster balanced.
-   * @param reports a set of datanode storage reports
+   * @return the number of bytes needed to move in order to balance the cluster.
    */
-  private long init(DatanodeStorageReport[] reports) {
+  private long init(List<DatanodeStorageReport> reports) {
     // compute average utilization
     for (DatanodeStorageReport r : reports) {
-      if (shouldIgnore(r.getDatanodeInfo())) {
-        continue; 
-      }
       policy.accumulateSpaces(r);
     }
     policy.initAvgUtilization();
@@ -983,15 +254,8 @@ public class Balancer {
     // create network topology and classify utilization collections: 
     //   over-utilized, above-average, below-average and under-utilized.
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
-      final DatanodeInfo datanode = r.getDatanodeInfo();
-      if (shouldIgnore(datanode)) {
-        continue; // ignore decommissioning or decommissioned nodes
-      }
-      cluster.add(datanode);
-
-      final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes,
-          maxConcurrentMovesPerNode);
+    for(DatanodeStorageReport r : reports) {
+      final BalancerDatanode dn = dispatcher.newDatanode(r);
       for(StorageType t : StorageType.asList()) {
         final Double utilization = policy.getUtilization(r, t);
         if (utilization == null) { // datanode does not have such storage type 
@@ -1006,7 +270,7 @@ public class Balancer {
 
         final BalancerDatanode.StorageGroup g;
         if (utilizationDiff > 0) {
-          final Source s = dn.addSource(t, utilization, maxSize2Move, this);
+          final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher);
           if (thresholdDiff <= 0) { // within threshold
             aboveAvgUtilized.add(s);
           } else {
@@ -1023,14 +287,15 @@ public class Balancer {
             underUtilized.add(g);
           }
         }
-        storageGroupMap.put(g);
+        dispatcher.getStorageGroupMap().put(g);
       }
     }
 
     logUtilizationCollections();
     
-    Preconditions.checkState(storageGroupMap.size() == overUtilized.size()
-        + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(),
+    Preconditions.checkState(dispatcher.getStorageGroupMap().size()
+        == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+           + belowAvgUtilized.size(),
         "Mismatched number of storage groups");
     
     // return number of bytes to be moved in order to make the cluster balanced
@@ -1077,7 +342,7 @@ public class Balancer {
    */
   private long chooseStorageGroups() {
     // First, match nodes on the same node group if cluster is node group aware
-    if (cluster.isNodeGroupAware()) {
+    if (dispatcher.getCluster().isNodeGroupAware()) {
       chooseStorageGroups(Matcher.SAME_NODE_GROUP);
     }
     
@@ -1086,15 +351,7 @@ public class Balancer {
     // At last, match all remaining nodes
     chooseStorageGroups(Matcher.ANY_OTHER);
     
-    Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
-        "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
-            + sources.size() + " sources, " + targets.size() + " targets)");
-
-    long bytesToMove = 0L;
-    for (Source src : sources) {
-      bytesToMove += src.getScheduledSize();
-    }
-    return bytesToMove;
+    return dispatcher.bytesToMove();
   }
 
   /** Decide all <source, target> pairs according to the matcher. */
@@ -1166,9 +423,8 @@ public class Balancer {
     long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
     final Task task = new Task(target, size);
     source.addTask(task);
-    target.incScheduledSize(task.size);
-    sources.add(source);
-    targets.add(target);
+    target.incScheduledSize(task.getSize());
+    dispatcher.add(source, target);
     LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
         + source.getDisplayName() + " to " + target.getDisplayName());
   }
@@ -1182,7 +438,8 @@ public class Balancer {
         final C c = candidates.next();
         if (!c.hasSpaceForScheduling()) {
           candidates.remove();
-        } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) {
+        } else if (matcher.match(dispatcher.getCluster(),
+            g.getDatanode(), c.getDatanode())) {
           return c;
         }
       }
@@ -1190,172 +447,16 @@ public class Balancer {
     return null;
   }
 
-  private final AtomicLong bytesMoved = new AtomicLong();
-  
-  /* Start a thread to dispatch block moves for each source. 
-   * The thread selects blocks to move & sends request to proxy source to
-   * initiate block move. The process is flow controlled. Block selection is
-   * blocked if there are too many un-confirmed block moves.
-   * Return the total number of bytes successfully moved in this iteration.
-   */
-  private long dispatchBlockMoves() throws InterruptedException {
-    long bytesLastMoved = bytesMoved.get();
-    Future<?>[] futures = new Future<?>[sources.size()];
-    int i=0;
-    for (Source source : sources) {
-      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
-    }
-    
-    // wait for all dispatcher threads to finish
-    for (Future<?> future : futures) {
-      try {
-        future.get();
-      } catch (ExecutionException e) {
-        LOG.warn("Dispatcher thread failed", e.getCause());
-      }
-    }
-    
-    // wait for all block moving to be done
-    waitForMoveCompletion();
-    
-    return bytesMoved.get()-bytesLastMoved;
-  }
-  
-  // The sleeping period before checking if block move is completed again
-  static private long blockMoveWaitTime = 30000L;
-  
-  /** set the sleeping period for block move completion check */
-  static void setBlockMoveWaitTime(long time) {
-    blockMoveWaitTime = time;
-  }
-  
-  /* wait for all block move confirmations 
-   * by checking each target's pendingMove queue 
-   */
-  private void waitForMoveCompletion() {
-    boolean shouldWait;
-    do {
-      shouldWait = false;
-      for (BalancerDatanode.StorageGroup target : targets) {
-        if (!target.getBalancerDatanode().isPendingQEmpty()) {
-          shouldWait = true;
-          break;
-        }
-      }
-      if (shouldWait) {
-        try {
-          Thread.sleep(blockMoveWaitTime);
-        } catch (InterruptedException ignored) {
-        }
-      }
-    } while (shouldWait);
-  }
-
-  /* Decide if it is OK to move the given block from source to target
-   * A block is a good candidate if
-   * 1. the block is not in the process of being moved/has not been moved;
-   * 2. the block does not have a replica on the target;
-   * 3. doing the move does not reduce the number of racks that the block has
-   */
-  private boolean isGoodBlockCandidate(Source source, 
-      BalancerDatanode.StorageGroup target, BalancerBlock block) {
-    if (source.storageType != target.storageType) {
-      return false;
-    }
-    // check if the block is moved or not
-    if (movedBlocks.contains(block.getBlock())) {
-      return false;
-    }
-    if (block.isLocatedOn(target)) {
-      return false;
-    }
-    if (cluster.isNodeGroupAware() && 
-        isOnSameNodeGroupWithReplicas(target, block, source)) {
-      return false;
-    }
-
-    boolean goodBlock = false;
-    if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
-      // good if source and target are on the same rack
-      goodBlock = true;
-    } else {
-      boolean notOnSameRack = true;
-      synchronized (block) {
-        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
-          if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
-            notOnSameRack = false;
-            break;
-          }
-        }
-      }
-      if (notOnSameRack) {
-        // good if target is target is not on the same rack as any replica
-        goodBlock = true;
-      } else {
-        // good if source is on the same rack as on of the replicas
-        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
-          if (loc != source && 
-              cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
-            goodBlock = true;
-            break;
-          }
-        }
-      }
-    }
-    return goodBlock;
-  }
-
-  /**
-   * Check if there are any replica (other than source) on the same node group
-   * with target. If true, then target is not a good candidate for placing 
-   * specific block replica as we don't want 2 replicas under the same nodegroup 
-   * after balance.
-   * @param target targetDataNode
-   * @param block dataBlock
-   * @param source sourceDataNode
-   * @return true if there are any replica (other than source) on the same node
-   * group with target
-   */
-  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
-      BalancerBlock block, Source source) {
-    final DatanodeInfo targetDn = target.getDatanode();
-    for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
-      if (loc != source && 
-          cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /* reset all fields in a balancer preparing for the next iteration */
   private void resetData(Configuration conf) {
-    this.cluster = NetworkTopology.getInstance(conf);
     this.overUtilized.clear();
     this.aboveAvgUtilized.clear();
     this.belowAvgUtilized.clear();
     this.underUtilized.clear();
-    this.storageGroupMap.clear();
-    this.sources.clear();
-    this.targets.clear();  
     this.policy.reset();
-    cleanGlobalBlockList();
-    this.movedBlocks.cleanup();
+    dispatcher.reset(conf);;
   }
   
-  /* Remove all blocks from the global block list except for the ones in the
-   * moved list.
-   */
-  private void cleanGlobalBlockList() {
-    for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
-    globalBlockListIterator.hasNext();) {
-      Block block = globalBlockListIterator.next();
-      if(!movedBlocks.contains(block)) {
-        globalBlockListIterator.remove();
-      }
-    }
-  }
-
   // Exit status
   enum ReturnStatus {
     // These int values will map directly to the balancer process's exit code.
@@ -1379,11 +480,8 @@ public class Balancer {
   private ReturnStatus run(int iteration, Formatter formatter,
       Configuration conf) {
     try {
-      /* get all live datanodes of a cluster and their disk usage
-       * decide the number of bytes need to be moved
-       */
-      final long bytesLeftToMove = init(
-          nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE));
+      final List<DatanodeStorageReport> reports = dispatcher.init();
+      final long bytesLeftToMove = init(reports);
       if (bytesLeftToMove == 0) {
         System.out.println("The cluster is balanced. Exiting...");
         return ReturnStatus.SUCCESS;
@@ -1409,7 +507,7 @@ public class Balancer {
       formatter.format("%-24s %10d  %19s  %18s  %17s%n",
           DateFormat.getDateTimeInstance().format(new Date()),
           iteration,
-          StringUtils.byteDesc(bytesMoved.get()),
+          StringUtils.byteDesc(dispatcher.getBytesMoved()),
           StringUtils.byteDesc(bytesLeftToMove),
           StringUtils.byteDesc(bytesToMove)
           );
@@ -1420,7 +518,7 @@ public class Balancer {
        * available to move.
        * Exit no byte has been moved for 5 consecutive iterations.
        */
-      if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
+      if (!dispatcher.dispatchAndCheckContinue()) {
         return ReturnStatus.NO_MOVE_PROGRESS;
       }
 
@@ -1435,9 +533,7 @@ public class Balancer {
       System.out.println(e + ".  Exiting ...");
       return ReturnStatus.INTERRUPTED;
     } finally {
-      // shutdown thread pools
-      dispatcherExecutor.shutdownNow();
-      moverExecutor.shutdownNow();
+      dispatcher.shutdownNow();
     }
   }
 
@@ -1546,76 +642,6 @@ public class Balancer {
     }
   }
 
-  static class Util {
-
-    /**
-     * @param datanode
-     * @return returns true if data node is part of the excludedNodes.
-     */
-    static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
-      return isIn(excludedNodes, datanode);
-    }
-
-    /**
-     * @param datanode
-     * @return returns true if includedNodes is empty or data node is part of the includedNodes.
-     */
-    static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
-      return (includedNodes.isEmpty() ||
-          isIn(includedNodes, datanode));
-    }
-    /**
-     * Match is checked using host name , ip address with and without port number.
-     * @param datanodeSet
-     * @param datanode
-     * @return true if the datanode's transfer address matches the set of nodes.
-     */
-    private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
-      return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
-          isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
-          isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
-    }
-
-    /**
-     * returns true if nodes contains host or host:port
-     * @param nodes
-     * @param host
-     * @param port
-     * @return
-     */
-    private static boolean isIn(Set<String> nodes, String host, int port) {
-      if (host == null) {
-        return false;
-      }
-      return (nodes.contains(host) || nodes.contains(host +":"+ port));
-    }
-
-    /**
-     * parse a comma separated string to obtain set of host names
-     * @param string
-     * @return
-     */
-    static Set<String> parseHostList(String string) {
-      String[] addrs = StringUtils.getTrimmedStrings(string);
-      return new HashSet<String>(Arrays.asList(addrs));
-    }
-
-    /**
-     * read set of host names from a file
-     * @param fileName
-     * @return
-     */
-    static Set<String> getHostListFromFile(String fileName) {
-      Set<String> nodes = new HashSet <String> ();
-      try {
-        HostsFileReader.readFileToSet("nodes", fileName, nodes);
-        return StringUtils.getTrimmedStrings(nodes);
-      } catch (IOException e) {
-        throw new IllegalArgumentException("Unable to open file: " + fileName);
-      }
-    }
-  }
-
   static class Cli extends Configured implements Tool {
     /**
      * Parse arguments and then run Balancer.
@@ -1688,7 +714,7 @@ public class Balancer {
                 checkArgument(++i < args.length,
                     "File containing nodes to exclude is not specified: args = "
                     + Arrays.toString(args));
-                nodesTobeExcluded = Util.getHostListFromFile(args[i]);
+                nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
               } else {
                 nodesTobeExcluded = Util.parseHostList(args[i]);
               }
@@ -1700,7 +726,7 @@ public class Balancer {
                 checkArgument(++i < args.length,
                     "File containing nodes to include is not specified: args = "
                     + Arrays.toString(args));
-                nodesTobeIncluded = Util.getHostListFromFile(args[i]);
+                nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
                } else {
                 nodesTobeIncluded = Util.parseHostList(args[i]);
               }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1616889&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Fri Aug  8 21:33:57 2014
@@ -0,0 +1,1060 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+
+/** Dispatching block replica moves between datanodes. */
+@InterfaceAudience.Private
+public class Dispatcher {
+  static final Log LOG = LogFactory.getLog(Dispatcher.class);
+
+  private static final long GB = 1L << 30; // 1GB
+  private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
+
+  private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
+  private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
+  private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
+                                                                     // minutes
+
+  private final NameNodeConnector nnc;
+  private final KeyManager keyManager;
+  private final SaslDataTransferClient saslClient;
+
+  /** Set of datanodes to be excluded. */
+  private final Set<String> excludedNodes;
+  /** Restrict to the following nodes. */
+  private final Set<String> includedNodes;
+
+  private final Collection<Source> sources = new HashSet<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> targets
+      = new HashSet<BalancerDatanode.StorageGroup>();
+
+  private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
+  private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
+
+  /** Map (datanodeUuid,storageType -> StorageGroup) */
+  private final StorageGroupMap storageGroupMap = new StorageGroupMap();
+
+  private NetworkTopology cluster;
+
+  private final ExecutorService moveExecutor;
+  private final ExecutorService dispatchExecutor;
+  /** The maximum number of concurrent blocks moves at a datanode */
+  private final int maxConcurrentMovesPerNode;
+
+  private final AtomicLong bytesMoved = new AtomicLong();
+
+  private static class GlobalBlockMap {
+    private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
+
+    /**
+     * Get the block from the map;
+     * if the block is not found, create a new block and put it in the map.
+     */
+    private DBlock get(Block b) {
+      DBlock block = map.get(b);
+      if (block == null) {
+        block = new DBlock(b);
+        map.put(b, block);
+      }
+      return block;
+    }
+    
+    /** Remove all blocks except for the moved blocks. */
+    private void removeAllButRetain(
+        MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks) {
+      for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
+        if (!movedBlocks.contains(i.next())) {
+          i.remove();
+        }
+      }
+    }
+  }
+
+  static class StorageGroupMap {
+    private static String toKey(String datanodeUuid, StorageType storageType) {
+      return datanodeUuid + ":" + storageType;
+    }
+
+    private final Map<String, BalancerDatanode.StorageGroup> map
+        = new HashMap<String, BalancerDatanode.StorageGroup>();
+
+    BalancerDatanode.StorageGroup get(String datanodeUuid,
+        StorageType storageType) {
+      return map.get(toKey(datanodeUuid, storageType));
+    }
+
+    void put(BalancerDatanode.StorageGroup g) {
+      final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
+      final BalancerDatanode.StorageGroup existing = map.put(key, g);
+      Preconditions.checkState(existing == null);
+    }
+
+    int size() {
+      return map.size();
+    }
+
+    void clear() {
+      map.clear();
+    }
+  }
+
+  /** This class keeps track of a scheduled block move */
+  private class PendingMove {
+    private DBlock block;
+    private Source source;
+    private BalancerDatanode proxySource;
+    private BalancerDatanode.StorageGroup target;
+
+    private PendingMove() {
+    }
+
+    @Override
+    public String toString() {
+      final Block b = block.getBlock();
+      return b + " with size=" + b.getNumBytes() + " from "
+          + source.getDisplayName() + " to " + target.getDisplayName()
+          + " through " + proxySource.datanode;
+    }
+
+    /**
+     * Choose a block & a proxy source for this pendingMove whose source &
+     * target have already been chosen.
+     * 
+     * @return true if a block and its proxy are chosen; false otherwise
+     */
+    private boolean chooseBlockAndProxy() {
+      // iterate all source's blocks until find a good one
+      for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
+        if (markMovedIfGoodBlock(i.next())) {
+          i.remove();
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /**
+     * @return true if the given block is good for the tentative move.
+     */
+    private boolean markMovedIfGoodBlock(DBlock block) {
+      synchronized (block) {
+        synchronized (movedBlocks) {
+          if (isGoodBlockCandidate(source, target, block)) {
+            this.block = block;
+            if (chooseProxySource()) {
+              movedBlocks.put(block);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Decided to move " + this);
+              }
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Choose a proxy source.
+     * 
+     * @return true if a proxy is found; otherwise false
+     */
+    private boolean chooseProxySource() {
+      final DatanodeInfo targetDN = target.getDatanode();
+      // if node group is supported, first try add nodes in the same node group
+      if (cluster.isNodeGroupAware()) {
+        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+          if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)
+              && addTo(loc)) {
+            return true;
+          }
+        }
+      }
+      // check if there is replica which is on the same rack with the target
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+          return true;
+        }
+      }
+      // find out a non-busy replica
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+        if (addTo(loc)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /** add to a proxy source for specific block movement */
+    private boolean addTo(BalancerDatanode.StorageGroup g) {
+      final BalancerDatanode bdn = g.getBalancerDatanode();
+      if (bdn.addPendingBlock(this)) {
+        proxySource = bdn;
+        return true;
+      }
+      return false;
+    }
+
+    /** Dispatch the move to the proxy source & wait for the response. */
+    private void dispatch() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Start moving " + this);
+      }
+
+      Socket sock = new Socket();
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        sock.connect(
+            NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
+            HdfsServerConstants.READ_TIMEOUT);
+        /*
+         * Unfortunately we don't have a good way to know if the Datanode is
+         * taking a really long time to move a block, OR something has gone
+         * wrong and it's never going to finish. To deal with this scenario, we
+         * set a long timeout (20 minutes) to avoid hanging the balancer
+         * indefinitely.
+         */
+        sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
+
+        sock.setKeepAlive(true);
+
+        OutputStream unbufOut = sock.getOutputStream();
+        InputStream unbufIn = sock.getInputStream();
+        ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
+            block.getBlock());
+        Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+            unbufIn, keyManager, accessToken, target.getDatanode());
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        in = new DataInputStream(new BufferedInputStream(unbufIn,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+
+        sendRequest(out, eb, accessToken);
+        receiveResponse(in);
+        bytesMoved.addAndGet(block.getNumBytes());
+        LOG.info("Successfully moved " + this);
+      } catch (IOException e) {
+        LOG.warn("Failed to move " + this + ": " + e.getMessage());
+        /*
+         * proxy or target may have an issue, insert a small delay before using
+         * these nodes further. This avoids a potential storm of
+         * "threads quota exceeded" Warnings when the balancer gets out of sync
+         * with work going on in datanode.
+         */
+        proxySource.activateDelay(DELAY_AFTER_ERROR);
+        target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
+      } finally {
+        IOUtils.closeStream(out);
+        IOUtils.closeStream(in);
+        IOUtils.closeSocket(sock);
+
+        proxySource.removePendingBlock(this);
+        target.getBalancerDatanode().removePendingBlock(this);
+
+        synchronized (this) {
+          reset();
+        }
+        synchronized (Dispatcher.this) {
+          Dispatcher.this.notifyAll();
+        }
+      }
+    }
+
+    /** Send a block replace request to the output stream */
+    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+        Token<BlockTokenIdentifier> accessToken) throws IOException {
+      new Sender(out).replaceBlock(eb, target.storageType, accessToken, source
+          .getDatanode().getDatanodeUuid(), proxySource.datanode);
+    }
+
+    /** Receive a block copy response from the input stream */
+    private void receiveResponse(DataInputStream in) throws IOException {
+      BlockOpResponseProto response = BlockOpResponseProto
+          .parseFrom(vintPrefixed(in));
+      if (response.getStatus() != Status.SUCCESS) {
+        if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+          throw new IOException("block move failed due to access token error");
+        }
+        throw new IOException("block move is failed: " + response.getMessage());
+      }
+    }
+
+    /** reset the object */
+    private void reset() {
+      block = null;
+      source = null;
+      proxySource = null;
+      target = null;
+    }
+  }
+
+  /** A class for keeping track of block locations in the dispatcher. */
+  private static class DBlock extends
+      MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
+    DBlock(Block block) {
+      super(block);
+    }
+  }
+
+  /** The class represents a desired move. */
+  static class Task {
+    private final BalancerDatanode.StorageGroup target;
+    private long size; // bytes scheduled to move
+
+    Task(BalancerDatanode.StorageGroup target, long size) {
+      this.target = target;
+      this.size = size;
+    }
+
+    long getSize() {
+      return size;
+    }
+  }
+
+  /** A class that keeps track of a datanode. */
+  static class BalancerDatanode {
+
+    /** A group of storages in a datanode with the same storage type. */
+    class StorageGroup {
+      final StorageType storageType;
+      final double utilization;
+      final long maxSize2Move;
+      private long scheduledSize = 0L;
+
+      private StorageGroup(StorageType storageType, double utilization,
+          long maxSize2Move) {
+        this.storageType = storageType;
+        this.utilization = utilization;
+        this.maxSize2Move = maxSize2Move;
+      }
+
+      BalancerDatanode getBalancerDatanode() {
+        return BalancerDatanode.this;
+      }
+
+      DatanodeInfo getDatanode() {
+        return BalancerDatanode.this.datanode;
+      }
+
+      /** Decide if still need to move more bytes */
+      synchronized boolean hasSpaceForScheduling() {
+        return availableSizeToMove() > 0L;
+      }
+
+      /** @return the total number of bytes that need to be moved */
+      synchronized long availableSizeToMove() {
+        return maxSize2Move - scheduledSize;
+      }
+
+      /** increment scheduled size */
+      synchronized void incScheduledSize(long size) {
+        scheduledSize += size;
+      }
+
+      /** @return scheduled size */
+      synchronized long getScheduledSize() {
+        return scheduledSize;
+      }
+
+      /** Reset scheduled size to zero. */
+      synchronized void resetScheduledSize() {
+        scheduledSize = 0L;
+      }
+
+      /** @return the name for display */
+      String getDisplayName() {
+        return datanode + ":" + storageType;
+      }
+
+      @Override
+      public String toString() {
+        return "" + utilization;
+      }
+    }
+
+    final DatanodeInfo datanode;
+    final EnumMap<StorageType, StorageGroup> storageMap
+        = new EnumMap<StorageType, StorageGroup>(StorageType.class);
+    protected long delayUntil = 0L;
+    /** blocks being moved but not confirmed yet */
+    private final List<PendingMove> pendings;
+    private final int maxConcurrentMoves;
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
+    }
+
+    private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
+      this.datanode = r.getDatanodeInfo();
+      this.maxConcurrentMoves = maxConcurrentMoves;
+      this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
+    }
+
+    private void put(StorageType storageType, StorageGroup g) {
+      final StorageGroup existing = storageMap.put(storageType, g);
+      Preconditions.checkState(existing == null);
+    }
+
+    StorageGroup addStorageGroup(StorageType storageType, double utilization,
+        long maxSize2Move) {
+      final StorageGroup g = new StorageGroup(storageType, utilization,
+          maxSize2Move);
+      put(storageType, g);
+      return g;
+    }
+
+    Source addSource(StorageType storageType, double utilization,
+        long maxSize2Move, Dispatcher balancer) {
+      final Source s = balancer.new Source(storageType, utilization,
+          maxSize2Move, this);
+      put(storageType, s);
+      return s;
+    }
+
+    synchronized private void activateDelay(long delta) {
+      delayUntil = Time.monotonicNow() + delta;
+    }
+
+    synchronized private boolean isDelayActive() {
+      if (delayUntil == 0 || Time.monotonicNow() > delayUntil) {
+        delayUntil = 0;
+        return false;
+      }
+      return true;
+    }
+
+    /** Check if the node can schedule more blocks to move */
+    synchronized boolean isPendingQNotFull() {
+      return pendings.size() < maxConcurrentMoves;
+    }
+
+    /** Check if all the dispatched moves are done */
+    synchronized boolean isPendingQEmpty() {
+      return pendings.isEmpty();
+    }
+
+    /** Add a scheduled block move to the node */
+    synchronized boolean addPendingBlock(PendingMove pendingBlock) {
+      if (!isDelayActive() && isPendingQNotFull()) {
+        return pendings.add(pendingBlock);
+      }
+      return false;
+    }
+
+    /** Remove a scheduled block move from the node */
+    synchronized boolean removePendingBlock(PendingMove pendingBlock) {
+      return pendings.remove(pendingBlock);
+    }
+  }
+
+  /** A node that can be the sources of a block move */
+  class Source extends BalancerDatanode.StorageGroup {
+
+    private final List<Task> tasks = new ArrayList<Task>(2);
+    private long blocksToReceive = 0L;
+    /**
+     * Source blocks point to the objects in {@link Dispatcher#globalBlocks}
+     * because we want to keep one copy of a block and be aware that the
+     * locations are changing over time.
+     */
+    private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
+
+    private Source(StorageType storageType, double utilization,
+        long maxSize2Move, BalancerDatanode dn) {
+      dn.super(storageType, utilization, maxSize2Move);
+    }
+
+    /** Add a task */
+    void addTask(Task task) {
+      Preconditions.checkState(task.target != this,
+          "Source and target are the same storage group " + getDisplayName());
+      incScheduledSize(task.size);
+      tasks.add(task);
+    }
+
+    /** @return an iterator to this source's blocks */
+    Iterator<DBlock> getBlockIterator() {
+      return srcBlocks.iterator();
+    }
+
+    /**
+     * Fetch new blocks of this source from namenode and update this source's
+     * block list & {@link Dispatcher#globalBlocks}.
+     * 
+     * @return the total size of the received blocks in the number of bytes.
+     */
+    private long getBlockList() throws IOException {
+      final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+      final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size);
+
+      long bytesReceived = 0;
+      for (BlockWithLocations blk : newBlocks.getBlocks()) {
+        bytesReceived += blk.getBlock().getNumBytes();
+        synchronized (globalBlocks) {
+          final DBlock block = globalBlocks.get(blk.getBlock());
+          synchronized (block) {
+            block.clearLocations();
+
+            // update locations
+            final String[] datanodeUuids = blk.getDatanodeUuids();
+            final StorageType[] storageTypes = blk.getStorageTypes();
+            for (int i = 0; i < datanodeUuids.length; i++) {
+              final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+                  datanodeUuids[i], storageTypes[i]);
+              if (g != null) { // not unknown
+                block.addLocation(g);
+              }
+            }
+          }
+          if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
+            // filter bad candidates
+            srcBlocks.add(block);
+          }
+        }
+      }
+      return bytesReceived;
+    }
+
+    /** Decide if the given block is a good candidate to move or not */
+    private boolean isGoodBlockCandidate(DBlock block) {
+      for (Task t : tasks) {
+        if (Dispatcher.this.isGoodBlockCandidate(this, t.target, block)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Choose a move for the source. The block's source, target, and proxy
+     * are determined too. When choosing proxy and target, source &
+     * target throttling has been considered. They are chosen only when they
+     * have the capacity to support this block move. The block should be
+     * dispatched immediately after this method is returned.
+     * 
+     * @return a move that's good for the source to dispatch immediately.
+     */
+    private PendingMove chooseNextMove() {
+      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
+        final Task task = i.next();
+        final BalancerDatanode target = task.target.getBalancerDatanode();
+        PendingMove pendingBlock = new PendingMove();
+        if (target.addPendingBlock(pendingBlock)) {
+          // target is not busy, so do a tentative block allocation
+          pendingBlock.source = this;
+          pendingBlock.target = task.target;
+          if (pendingBlock.chooseBlockAndProxy()) {
+            long blockSize = pendingBlock.block.getNumBytes();
+            incScheduledSize(-blockSize);
+            task.size -= blockSize;
+            if (task.size == 0) {
+              i.remove();
+            }
+            return pendingBlock;
+          } else {
+            // cancel the tentative move
+            target.removePendingBlock(pendingBlock);
+          }
+        }
+      }
+      return null;
+    }
+
+    /** Iterate all source's blocks to remove moved ones */
+    private void removeMovedBlocks() {
+      for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) {
+        if (movedBlocks.contains(i.next().getBlock())) {
+          i.remove();
+        }
+      }
+    }
+
+    private static final int SOURCE_BLOCKS_MIN_SIZE = 5;
+
+    /** @return if should fetch more blocks from namenode */
+    private boolean shouldFetchMoreBlocks() {
+      return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0;
+    }
+
+    private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
+
+    /**
+     * This method iteratively does the following: it first selects a block to
+     * move, then sends a request to the proxy source to start the block move
+     * when the source's block list falls below a threshold, it asks the
+     * namenode for more blocks. It terminates when it has dispatch enough block
+     * move tasks or it has received enough blocks from the namenode, or the
+     * elapsed time of the iteration has exceeded the max time limit.
+     */
+    private void dispatchBlocks() {
+      final long startTime = Time.monotonicNow();
+      this.blocksToReceive = 2 * getScheduledSize();
+      boolean isTimeUp = false;
+      int noPendingBlockIteration = 0;
+      while (!isTimeUp && getScheduledSize() > 0
+          && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
+        final PendingMove p = chooseNextMove();
+        if (p != null) {
+          // move the block
+          moveExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+              p.dispatch();
+            }
+          });
+          continue;
+        }
+
+        // Since we cannot schedule any block to move,
+        // remove any moved blocks from the source block list and
+        removeMovedBlocks(); // filter already moved blocks
+        // check if we should fetch more blocks from the namenode
+        if (shouldFetchMoreBlocks()) {
+          // fetch new blocks
+          try {
+            blocksToReceive -= getBlockList();
+            continue;
+          } catch (IOException e) {
+            LOG.warn("Exception while getting block list", e);
+            return;
+          }
+        } else {
+          // source node cannot find a pendingBlockToMove, iteration +1
+          noPendingBlockIteration++;
+          // in case no blocks can be moved for source node's task,
+          // jump out of while-loop after 5 iterations.
+          if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
+            resetScheduledSize();
+          }
+        }
+
+        // check if time is up or not
+        if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
+          isTimeUp = true;
+          continue;
+        }
+
+        // Now we can not schedule any block to move and there are
+        // no new blocks added to the source block list, so we wait.
+        try {
+          synchronized (Dispatcher.this) {
+            Dispatcher.this.wait(1000); // wait for targets/sources to be idle
+          }
+        } catch (InterruptedException ignored) {
+        }
+      }
+    }
+  }
+
+  Dispatcher(NameNodeConnector theblockpool, Set<String> includedNodes,
+      Set<String> excludedNodes, Configuration conf) {
+    this.nnc = theblockpool;
+    this.keyManager = nnc.getKeyManager();
+    this.excludedNodes = excludedNodes;
+    this.includedNodes = includedNodes;
+
+    final long movedWinWidth = conf.getLong(
+        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+    movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
+
+    this.cluster = NetworkTopology.getInstance(conf);
+
+    this.moveExecutor = Executors.newFixedThreadPool(conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
+        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
+    this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
+        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
+    this.maxConcurrentMovesPerNode = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+    final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.saslClient = new SaslDataTransferClient(
+        DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+        TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
+  }
+
+  StorageGroupMap getStorageGroupMap() {
+    return storageGroupMap;
+  }
+
+  NetworkTopology getCluster() {
+    return cluster;
+  }
+  
+  long getBytesMoved() {
+    return bytesMoved.get();
+  }
+
+  long bytesToMove() {
+    Preconditions.checkState(
+        storageGroupMap.size() >= sources.size() + targets.size(),
+        "Mismatched number of storage groups (" + storageGroupMap.size()
+            + " < " + sources.size() + " sources + " + targets.size()
+            + " targets)");
+
+    long b = 0L;
+    for (Source src : sources) {
+      b += src.getScheduledSize();
+    }
+    return b;
+  }
+
+  void add(Source source, BalancerDatanode.StorageGroup target) {
+    sources.add(source);
+    targets.add(target);
+  }
+
+  private boolean shouldIgnore(DatanodeInfo dn) {
+    // ignore decommissioned nodes
+    final boolean decommissioned = dn.isDecommissioned();
+    // ignore decommissioning nodes
+    final boolean decommissioning = dn.isDecommissionInProgress();
+    // ignore nodes in exclude list
+    final boolean excluded = Util.isExcluded(excludedNodes, dn);
+    // ignore nodes not in the include list (if include list is not empty)
+    final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
+
+    if (decommissioned || decommissioning || excluded || notIncluded) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+            + decommissioning + ", " + excluded + ", " + notIncluded);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /** Get live datanode storage reports and then build the network topology. */
+  List<DatanodeStorageReport> init() throws IOException {
+    final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
+    final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>(); 
+    // create network topology and classify utilization collections:
+    // over-utilized, above-average, below-average and under-utilized.
+    for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
+      final DatanodeInfo datanode = r.getDatanodeInfo();
+      if (shouldIgnore(datanode)) {
+        continue;
+      }
+      trimmed.add(r);
+      cluster.add(datanode);
+    }
+    return trimmed;
+  }
+
+  public BalancerDatanode newDatanode(DatanodeStorageReport r) {
+    return new BalancerDatanode(r, maxConcurrentMovesPerNode);
+  }
+
+  public boolean dispatchAndCheckContinue() throws InterruptedException {
+    return nnc.shouldContinue(dispatchBlockMoves());
+  }
+
+  /**
+   * Dispatch block moves for each source. The thread selects blocks to move &
+   * sends request to proxy source to initiate block move. The process is flow
+   * controlled. Block selection is blocked if there are too many un-confirmed
+   * block moves.
+   * 
+   * @return the total number of bytes successfully moved in this iteration.
+   */
+  private long dispatchBlockMoves() throws InterruptedException {
+    final long bytesLastMoved = bytesMoved.get();
+    final Future<?>[] futures = new Future<?>[sources.size()];
+
+    final Iterator<Source> i = sources.iterator();
+    for (int j = 0; j < futures.length; j++) {
+      final Source s = i.next();
+      futures[j] = dispatchExecutor.submit(new Runnable() {
+        @Override
+        public void run() {
+          s.dispatchBlocks();
+        }
+      });
+    }
+
+    // wait for all dispatcher threads to finish
+    for (Future<?> future : futures) {
+      try {
+        future.get();
+      } catch (ExecutionException e) {
+        LOG.warn("Dispatcher thread failed", e.getCause());
+      }
+    }
+
+    // wait for all block moving to be done
+    waitForMoveCompletion();
+
+    return bytesMoved.get() - bytesLastMoved;
+  }
+
+  /** The sleeping period before checking if block move is completed again */
+  static private long blockMoveWaitTime = 30000L;
+
+  /** set the sleeping period for block move completion check */
+  static void setBlockMoveWaitTime(long time) {
+    blockMoveWaitTime = time;
+  }
+
+  /** Wait for all block move confirmations. */
+  private void waitForMoveCompletion() {
+    for(;;) {
+      boolean empty = true;
+      for (BalancerDatanode.StorageGroup t : targets) {
+        if (!t.getBalancerDatanode().isPendingQEmpty()) {
+          empty = false;
+          break;
+        }
+      }
+      if (empty) {
+        return; //all pending queues are empty
+      }
+      try {
+        Thread.sleep(blockMoveWaitTime);
+      } catch (InterruptedException ignored) {
+      }
+    }
+  }
+
+  /**
+   * Decide if the block is a good candidate to be moved from source to target.
+   * A block is a good candidate if 
+   * 1. the block is not in the process of being moved/has not been moved;
+   * 2. the block does not have a replica on the target;
+   * 3. doing the move does not reduce the number of racks that the block has
+   */
+  private boolean isGoodBlockCandidate(Source source,
+      BalancerDatanode.StorageGroup target, DBlock block) {
+    if (source.storageType != target.storageType) {
+      return false;
+    }
+    // check if the block is moved or not
+    if (movedBlocks.contains(block.getBlock())) {
+      return false;
+    }
+    if (block.isLocatedOn(target)) {
+      return false;
+    }
+    if (cluster.isNodeGroupAware()
+        && isOnSameNodeGroupWithReplicas(target, block, source)) {
+      return false;
+    }
+    if (reduceNumOfRacks(source, target, block)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Determine whether moving the given block replica from source to target
+   * would reduce the number of racks of the block replicas.
+   */
+  private boolean reduceNumOfRacks(Source source,
+      BalancerDatanode.StorageGroup target, DBlock block) {
+    final DatanodeInfo sourceDn = source.getDatanode();
+    if (cluster.isOnSameRack(sourceDn, target.getDatanode())) {
+      // source and target are on the same rack
+      return false;
+    }
+    boolean notOnSameRack = true;
+    synchronized (block) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+        if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
+          notOnSameRack = false;
+          break;
+        }
+      }
+    }
+    if (notOnSameRack) {
+      // target is not on the same rack as any replica
+      return false;
+    }
+    for (BalancerDatanode.StorageGroup g : block.getLocations()) {
+      if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) {
+        // source is on the same rack of another replica
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if there are any replica (other than source) on the same node group
+   * with target. If true, then target is not a good candidate for placing
+   * specific replica as we don't want 2 replicas under the same nodegroup.
+   * 
+   * @return true if there are any replica (other than source) on the same node
+   *         group with target
+   */
+  private boolean isOnSameNodeGroupWithReplicas(
+      BalancerDatanode.StorageGroup target, DBlock block, Source source) {
+    final DatanodeInfo targetDn = target.getDatanode();
+    for (BalancerDatanode.StorageGroup g : block.getLocations()) {
+      if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** Reset all fields in order to prepare for the next iteration */
+  void reset(Configuration conf) {
+    cluster = NetworkTopology.getInstance(conf);
+    storageGroupMap.clear();
+    sources.clear();
+    targets.clear();
+    globalBlocks.removeAllButRetain(movedBlocks);
+    movedBlocks.cleanup();
+  }
+
+  /** shutdown thread pools */
+  void shutdownNow() {
+    dispatchExecutor.shutdownNow();
+    moveExecutor.shutdownNow();
+  }
+
+  static class Util {
+    /** @return true if data node is part of the excludedNodes. */
+    static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) {
+      return isIn(excludedNodes, dn);
+    }
+
+    /**
+     * @return true if includedNodes is empty or data node is part of the
+     *         includedNodes.
+     */
+    static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) {
+      return (includedNodes.isEmpty() || isIn(includedNodes, dn));
+    }
+
+    /**
+     * Match is checked using host name , ip address with and without port
+     * number.
+     * 
+     * @return true if the datanode's transfer address matches the set of nodes.
+     */
+    private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) {
+      return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort())
+          || isIn(datanodes, dn.getIpAddr(), dn.getXferPort())
+          || isIn(datanodes, dn.getHostName(), dn.getXferPort());
+    }
+
+    /** @return true if nodes contains host or host:port */
+    private static boolean isIn(Set<String> nodes, String host, int port) {
+      if (host == null) {
+        return false;
+      }
+      return (nodes.contains(host) || nodes.contains(host + ":" + port));
+    }
+
+    /**
+     * Parse a comma separated string to obtain set of host names
+     * 
+     * @return set of host names
+     */
+    static Set<String> parseHostList(String string) {
+      String[] addrs = StringUtils.getTrimmedStrings(string);
+      return new HashSet<String>(Arrays.asList(addrs));
+    }
+
+    /**
+     * Read set of host names from a file
+     * 
+     * @return set of host names
+     */
+    static Set<String> getHostListFromFile(String fileName, String type) {
+      Set<String> nodes = new HashSet<String>();
+      try {
+        HostsFileReader.readFileToSet(type, fileName, nodes);
+        return StringUtils.getTrimmedStrings(nodes);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            "Failed to read host list from file: " + fileName);
+      }
+    }
+  }
+}