You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [10/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,9 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintStream;
-import java.net.Socket;
 import java.net.URI;
 import java.text.DateFormat;
 import java.util.ArrayList;
@@ -37,51 +29,38 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.base.Preconditions;
+
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
  * The tool is deployed as an application program that can be run by the 
@@ -182,667 +161,41 @@ import org.apache.hadoop.util.ToolRunner
 @InterfaceAudience.Private
 public class Balancer {
   static final Log LOG = LogFactory.getLog(Balancer.class);
-  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
-  private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
 
-  /** The maximum number of concurrent blocks moves for 
-   * balancing purpose at a datanode
-   */
-  public static final int MAX_NUM_CONCURRENT_MOVES = 5;
-  private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
-  public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
-  public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
-  
+  private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+
+  private static final long GB = 1L << 30; //1GB
+  private static final long MAX_SIZE_TO_MOVE = 10*GB;
+
   private static final String USAGE = "Usage: java "
       + Balancer.class.getSimpleName()
       + "\n\t[-policy <policy>]\tthe balancing policy: "
       + BalancingPolicy.Node.INSTANCE.getName() + " or "
       + BalancingPolicy.Pool.INSTANCE.getName()
-      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+      + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tExcludes the specified datanodes."
+      + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tIncludes only the specified datanodes.";
   
-  private final NameNodeConnector nnc;
+  private final Dispatcher dispatcher;
   private final BalancingPolicy policy;
   private final double threshold;
   
   // all data node lists
-  private final Collection<Source> overUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<Source> aboveAvgUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  private final Collection<BalancerDatanode> underUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  
-  private final Collection<Source> sources
-                               = new HashSet<Source>();
-  private final Collection<BalancerDatanode> targets
-                               = new HashSet<BalancerDatanode>();
-  
-  private final Map<Block, BalancerBlock> globalBlockList
-                 = new HashMap<Block, BalancerBlock>();
-  private final MovedBlocks movedBlocks = new MovedBlocks();
-  /** Map (datanodeUuid -> BalancerDatanodes) */
-  private final Map<String, BalancerDatanode> datanodeMap
-      = new HashMap<String, BalancerDatanode>();
-  
-  private NetworkTopology cluster;
-
-  private final ExecutorService moverExecutor;
-  private final ExecutorService dispatcherExecutor;
-
-  /* This class keeps track of a scheduled block move */
-  private class PendingBlockMove {
-    private BalancerBlock block;
-    private Source source;
-    private BalancerDatanode proxySource;
-    private BalancerDatanode target;
-    
-    /** constructor */
-    private PendingBlockMove() {
-    }
-    
-    @Override
-    public String toString() {
-      final Block b = block.getBlock();
-      return b + " with size=" + b.getNumBytes() + " from "
-          + source.getDisplayName() + " to " + target.getDisplayName()
-          + " through " + proxySource.getDisplayName();
-    }
-
-    /* choose a block & a proxy source for this pendingMove 
-     * whose source & target have already been chosen.
-     * 
-     * Return true if a block and its proxy are chosen; false otherwise
-     */
-    private boolean chooseBlockAndProxy() {
-      // iterate all source's blocks until find a good one
-      for (Iterator<BalancerBlock> blocks=
-        source.getBlockIterator(); blocks.hasNext();) {
-        if (markMovedIfGoodBlock(blocks.next())) {
-          blocks.remove();
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    /* Return true if the given block is good for the tentative move;
-     * If it is good, add it to the moved list to marked as "Moved".
-     * A block is good if
-     * 1. it is a good candidate; see isGoodBlockCandidate
-     * 2. can find a proxy source that's not busy for this move
-     */
-    private boolean markMovedIfGoodBlock(BalancerBlock block) {
-      synchronized(block) {
-        synchronized(movedBlocks) {
-          if (isGoodBlockCandidate(source, target, block)) {
-            this.block = block;
-            if ( chooseProxySource() ) {
-              movedBlocks.add(block);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Decided to move " + this);
-              }
-              return true;
-            }
-          }
-        }
-      }
-      return false;
-    }
-    
-    /* Now we find out source, target, and block, we need to find a proxy
-     * 
-     * @return true if a proxy is found; otherwise false
-     */
-    private boolean chooseProxySource() {
-      final DatanodeInfo targetDN = target.getDatanode();
-      // if node group is supported, first try add nodes in the same node group
-      if (cluster.isNodeGroupAware()) {
-        for (BalancerDatanode loc : block.getLocations()) {
-          if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
-            return true;
-          }
-        }
-      }
-      // check if there is replica which is on the same rack with the target
-      for (BalancerDatanode loc : block.getLocations()) {
-        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
-          return true;
-        }
-      }
-      // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
-        if (addTo(loc)) {
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    // add a BalancerDatanode as proxy source for specific block movement
-    private boolean addTo(BalancerDatanode bdn) {
-      if (bdn.addPendingBlock(this)) {
-        proxySource = bdn;
-        return true;
-      }
-      return false;
-    }
-    
-    /* Dispatch the block move task to the proxy source & wait for the response
-     */
-    private void dispatch() {
-      Socket sock = new Socket();
-      DataOutputStream out = null;
-      DataInputStream in = null;
-      try {
-        sock.connect(
-            NetUtils.createSocketAddr(target.datanode.getXferAddr()),
-            HdfsServerConstants.READ_TIMEOUT);
-        /* Unfortunately we don't have a good way to know if the Datanode is
-         * taking a really long time to move a block, OR something has
-         * gone wrong and it's never going to finish. To deal with this 
-         * scenario, we set a long timeout (20 minutes) to avoid hanging
-         * the balancer indefinitely.
-         */
-        sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
-
-        sock.setKeepAlive(true);
-        
-        OutputStream unbufOut = sock.getOutputStream();
-        InputStream unbufIn = sock.getInputStream();
-        if (nnc.getDataEncryptionKey() != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            HdfsConstants.IO_FILE_BUFFER_SIZE));
-        in = new DataInputStream(new BufferedInputStream(unbufIn,
-            HdfsConstants.IO_FILE_BUFFER_SIZE));
-        
-        sendRequest(out);
-        receiveResponse(in);
-        bytesMoved.inc(block.getNumBytes());
-        LOG.info("Successfully moved " + this);
-      } catch (IOException e) {
-        LOG.warn("Failed to move " + this + ": " + e.getMessage());
-        /* proxy or target may have an issue, insert a small delay
-         * before using these nodes further. This avoids a potential storm
-         * of "threads quota exceeded" Warnings when the balancer
-         * gets out of sync with work going on in datanode.
-         */
-        proxySource.activateDelay(DELAY_AFTER_ERROR);
-        target.activateDelay(DELAY_AFTER_ERROR);
-      } finally {
-        IOUtils.closeStream(out);
-        IOUtils.closeStream(in);
-        IOUtils.closeSocket(sock);
-        
-        proxySource.removePendingBlock(this);
-        target.removePendingBlock(this);
-
-        synchronized (this ) {
-          reset();
-        }
-        synchronized (Balancer.this) {
-          Balancer.this.notifyAll();
-        }
-      }
-    }
-    
-    /* Send a block replace request to the output stream*/
-    private void sendRequest(DataOutputStream out) throws IOException {
-      final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
-      final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
-      new Sender(out).replaceBlock(eb, accessToken,
-          source.getStorageID(), proxySource.getDatanode());
-    }
-    
-    /* Receive a block copy response from the input stream */ 
-    private void receiveResponse(DataInputStream in) throws IOException {
-      BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
-          vintPrefixed(in));
-      if (response.getStatus() != Status.SUCCESS) {
-        if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
-          throw new IOException("block move failed due to access token error");
-        throw new IOException("block move is failed: " +
-            response.getMessage());
-      }
-    }
-
-    /* reset the object */
-    private void reset() {
-      block = null;
-      source = null;
-      proxySource = null;
-      target = null;
-    }
-    
-    /* start a thread to dispatch the block move */
-    private void scheduleBlockMove() {
-      moverExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Start moving " + PendingBlockMove.this);
-          }
-          dispatch();
-        }
-      });
-    }
-  }
-  
-  /* A class for keeping track of blocks in the Balancer */
-  static private class BalancerBlock {
-    private final Block block; // the block
-    private final List<BalancerDatanode> locations
-            = new ArrayList<BalancerDatanode>(3); // its locations
-    
-    /* Constructor */
-    private BalancerBlock(Block block) {
-      this.block = block;
-    }
-    
-    /* clean block locations */
-    private synchronized void clearLocations() {
-      locations.clear();
-    }
-    
-    /* add a location */
-    private synchronized void addLocation(BalancerDatanode datanode) {
-      if (!locations.contains(datanode)) {
-        locations.add(datanode);
-      }
-    }
-    
-    /* Return if the block is located on <code>datanode</code> */
-    private synchronized boolean isLocatedOnDatanode(
-        BalancerDatanode datanode) {
-      return locations.contains(datanode);
-    }
-    
-    /* Return its locations */
-    private synchronized List<BalancerDatanode> getLocations() {
-      return locations;
-    }
-    
-    /* Return the block */
-    private Block getBlock() {
-      return block;
-    }
-    
-    /* Return the length of the block */
-    private long getNumBytes() {
-      return block.getNumBytes();
-    }
-  }
-  
-  /* The class represents a desired move of bytes between two nodes 
-   * and the target.
-   * An object of this class is stored in a source node. 
-   */
-  static private class NodeTask {
-    private final BalancerDatanode datanode; //target node
-    private long size;  //bytes scheduled to move
-    
-    /* constructor */
-    private NodeTask(BalancerDatanode datanode, long size) {
-      this.datanode = datanode;
-      this.size = size;
-    }
-    
-    /* Get the node */
-    private BalancerDatanode getDatanode() {
-      return datanode;
-    }
-    
-    /* Get the number of bytes that need to be moved */
-    private long getSize() {
-      return size;
-    }
-  }
-  
-  
-  /* A class that keeps track of a datanode in Balancer */
-  private static class BalancerDatanode {
-    final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
-    final DatanodeInfo datanode;
-    final double utilization;
-    final long maxSize2Move;
-    private long scheduledSize = 0L;
-    protected long delayUntil = 0L;
-    //  blocks being moved but not confirmed yet
-    private final List<PendingBlockMove> pendingBlocks =
-      new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
-    
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + "[" + datanode
-          + ", utilization=" + utilization + "]";
-    }
-
-    /* Constructor 
-     * Depending on avgutil & threshold, calculate maximum bytes to move 
-     */
-    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
-      datanode = node;
-      utilization = policy.getUtilization(node);
-      final double avgUtil = policy.getAvgUtilization();
-      long maxSizeToMove;
-
-      if (utilization >= avgUtil+threshold
-          || utilization <= avgUtil-threshold) { 
-        maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
-      } else {
-        maxSizeToMove = 
-          (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
-      }
-      if (utilization < avgUtil ) {
-        maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
-      }
-      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
-    }
-    
-    /** Get the datanode */
-    protected DatanodeInfo getDatanode() {
-      return datanode;
-    }
-    
-    /** Get the name of the datanode */
-    protected String getDisplayName() {
-      return datanode.toString();
-    }
-    
-    /* Get the storage id of the datanode */
-    protected String getStorageID() {
-      return datanode.getDatanodeUuid();
-    }
-    
-    /** Decide if still need to move more bytes */
-    protected synchronized boolean hasSpaceForScheduling() {
-      return scheduledSize<maxSize2Move;
-    }
-
-    /** Return the total number of bytes that need to be moved */
-    protected synchronized long availableSizeToMove() {
-      return maxSize2Move-scheduledSize;
-    }
-    
-    /** increment scheduled size */
-    protected synchronized void incScheduledSize(long size) {
-      scheduledSize += size;
-    }
-    
-    /** decrement scheduled size */
-    protected synchronized void decScheduledSize(long size) {
-      scheduledSize -= size;
-    }
-    
-    /** get scheduled size */
-    protected synchronized long getScheduledSize(){
-      return scheduledSize;
-    }
-    
-    /** get scheduled size */
-    protected synchronized void setScheduledSize(long size){
-      scheduledSize = size;
-    }
-
-    synchronized private void activateDelay(long delta) {
-      delayUntil = Time.now() + delta;
-    }
-
-    synchronized private boolean isDelayActive() {
-      if (delayUntil == 0 || Time.now() > delayUntil){
-        delayUntil = 0;
-        return false;
-      }
-        return true;
-    }
-    
-    /* Check if the node can schedule more blocks to move */
-    synchronized private boolean isPendingQNotFull() {
-      if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
-        return true;
-      }
-      return false;
-    }
-    
-    /* Check if all the dispatched moves are done */
-    synchronized private boolean isPendingQEmpty() {
-      return pendingBlocks.isEmpty();
-    }
-    
-    /* Add a scheduled block move to the node */
-    private synchronized boolean addPendingBlock(
-        PendingBlockMove pendingBlock) {
-      if (!isDelayActive() && isPendingQNotFull()) {
-        return pendingBlocks.add(pendingBlock);
-      }
-      return false;
-    }
-    
-    /* Remove a scheduled block move from the node */
-    private synchronized boolean  removePendingBlock(
-        PendingBlockMove pendingBlock) {
-      return pendingBlocks.remove(pendingBlock);
-    }
-  }
-  
-  /** A node that can be the sources of a block move */
-  private class Source extends BalancerDatanode {
-    
-    /* A thread that initiates a block move 
-     * and waits for block move to complete */
-    private class BlockMoveDispatcher implements Runnable {
-      @Override
-      public void run() {
-        dispatchBlocks();
-      }
-    }
-    
-    private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
-    private long blocksToReceive = 0L;
-    /* source blocks point to balancerBlocks in the global list because
-     * we want to keep one copy of a block in balancer and be aware that
-     * the locations are changing over time.
-     */
-    private final List<BalancerBlock> srcBlockList
-            = new ArrayList<BalancerBlock>();
-    
-    /* constructor */
-    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
-      super(node, policy, threshold);
-    }
-    
-    /** Add a node task */
-    private void addNodeTask(NodeTask task) {
-      assert (task.datanode != this) :
-        "Source and target are the same " + datanode;
-      incScheduledSize(task.getSize());
-      nodeTasks.add(task);
-    }
-    
-    /* Return an iterator to this source's blocks */
-    private Iterator<BalancerBlock> getBlockIterator() {
-      return srcBlockList.iterator();
-    }
-    
-    /* fetch new blocks of this source from namenode and
-     * update this source's block list & the global block list
-     * Return the total size of the received blocks in the number of bytes.
-     */
-    private long getBlockList() throws IOException {
-      BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, 
-        Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
-      long bytesReceived = 0;
-      for (BlockWithLocations blk : newBlocks) {
-        bytesReceived += blk.getBlock().getNumBytes();
-        BalancerBlock block;
-        synchronized(globalBlockList) {
-          block = globalBlockList.get(blk.getBlock());
-          if (block==null) {
-            block = new BalancerBlock(blk.getBlock());
-            globalBlockList.put(blk.getBlock(), block);
-          } else {
-            block.clearLocations();
-          }
-        
-          synchronized (block) {
-            // update locations
-            for (String datanodeUuid : blk.getDatanodeUuids()) {
-              final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (datanode != null) { // not an unknown datanode
-                block.addLocation(d);
-              }
-            }
-          }
-          if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
-            // filter bad candidates
-            srcBlockList.add(block);
-          }
-        }
-      }
-      return bytesReceived;
-    }
-
-    /* Decide if the given block is a good candidate to move or not */
-    private boolean isGoodBlockCandidate(BalancerBlock block) {
-      for (NodeTask nodeTask : nodeTasks) {
-        if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /* Return a block that's good for the source thread to dispatch immediately
-     * The block's source, target, and proxy source are determined too.
-     * When choosing proxy and target, source & target throttling
-     * has been considered. They are chosen only when they have the capacity
-     * to support this block move.
-     * The block should be dispatched immediately after this method is returned.
-     */
-    private PendingBlockMove chooseNextBlockToMove() {
-      for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
-        NodeTask task = tasks.next();
-        BalancerDatanode target = task.getDatanode();
-        PendingBlockMove pendingBlock = new PendingBlockMove();
-        if (target.addPendingBlock(pendingBlock)) { 
-          // target is not busy, so do a tentative block allocation
-          pendingBlock.source = this;
-          pendingBlock.target = target;
-          if ( pendingBlock.chooseBlockAndProxy() ) {
-            long blockSize = pendingBlock.block.getNumBytes();
-            decScheduledSize(blockSize);
-            task.size -= blockSize;
-            if (task.size == 0) {
-              tasks.remove();
-            }
-            return pendingBlock;
-          } else {
-            // cancel the tentative move
-            target.removePendingBlock(pendingBlock);
-          }
-        }
-      }
-      return null;
-    }
-
-    /* iterate all source's blocks to remove moved ones */    
-    private void filterMovedBlocks() {
-      for (Iterator<BalancerBlock> blocks=getBlockIterator();
-            blocks.hasNext();) {
-        if (movedBlocks.contains(blocks.next())) {
-          blocks.remove();
-        }
-      }
-    }
-    
-    private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
-    /* Return if should fetch more blocks from namenode */
-    private boolean shouldFetchMoreBlocks() {
-      return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
-                 blocksToReceive>0;
-    }
-    
-    /* This method iteratively does the following:
-     * it first selects a block to move,
-     * then sends a request to the proxy source to start the block move
-     * when the source's block list falls below a threshold, it asks
-     * the namenode for more blocks.
-     * It terminates when it has dispatch enough block move tasks or
-     * it has received enough blocks from the namenode, or 
-     * the elapsed time of the iteration has exceeded the max time limit.
-     */ 
-    private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
-    private void dispatchBlocks() {
-      long startTime = Time.now();
-      long scheduledSize = getScheduledSize();
-      this.blocksToReceive = 2*scheduledSize;
-      boolean isTimeUp = false;
-      int noPendingBlockIteration = 0;
-      while(!isTimeUp && getScheduledSize()>0 &&
-          (!srcBlockList.isEmpty() || blocksToReceive>0)) {
-        PendingBlockMove pendingBlock = chooseNextBlockToMove();
-        if (pendingBlock != null) {
-          // move the block
-          pendingBlock.scheduleBlockMove();
-          continue;
-        }
-        
-        /* Since we can not schedule any block to move,
-         * filter any moved blocks from the source block list and
-         * check if we should fetch more blocks from the namenode
-         */
-        filterMovedBlocks(); // filter already moved blocks
-        if (shouldFetchMoreBlocks()) {
-          // fetch new blocks
-          try {
-            blocksToReceive -= getBlockList();
-            continue;
-          } catch (IOException e) {
-            LOG.warn("Exception while getting block list", e);
-            return;
-          }
-        } else {
-          // source node cannot find a pendingBlockToMove, iteration +1
-          noPendingBlockIteration++;
-          // in case no blocks can be moved for source node's task,
-          // jump out of while-loop after 5 iterations.
-          if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
-            setScheduledSize(0);
-          }
-        }
-        
-        // check if time is up or not
-        if (Time.now()-startTime > MAX_ITERATION_TIME) {
-          isTimeUp = true;
-          continue;
-        }
-        
-        /* Now we can not schedule any block to move and there are
-         * no new blocks added to the source block list, so we wait. 
-         */
-        try {
-          synchronized(Balancer.this) {
-            Balancer.this.wait(1000);  // wait for targets/sources to be idle
-          }
-        } catch (InterruptedException ignored) {
-        }
-      }
-    }
-  }
+  private final Collection<Source> overUtilized = new LinkedList<Source>();
+  private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
+  private final Collection<StorageGroup> belowAvgUtilized
+      = new LinkedList<StorageGroup>();
+  private final Collection<StorageGroup> underUtilized
+      = new LinkedList<StorageGroup>();
 
   /* Check that this Balancer is compatible with the Block Placement Policy
    * used by the Namenode.
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+    if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof 
         BlockPlacementPolicyDefault)) {
       throw new UnsupportedActionException(
           "Balancer without BlockPlacementPolicyDefault");
@@ -857,190 +210,185 @@ public class Balancer {
    * when connection fails.
    */
   Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+    final long movedWinWidth = conf.getLong(
+        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+    final int moverThreads = conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
+        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
+    final int dispatcherThreads = conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
+        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
+    final int maxConcurrentMovesPerNode = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+    this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
+        p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
+        maxConcurrentMovesPerNode, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
-    this.nnc = theblockpool;
-    cluster = NetworkTopology.getInstance(conf);
-
-    this.moverExecutor = Executors.newFixedThreadPool(
-            conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
-                        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
-    this.dispatcherExecutor = Executors.newFixedThreadPool(
-            conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
-                        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
   }
   
-  /* Given a data node set, build a network topology and decide
-   * over-utilized datanodes, above average utilized datanodes, 
-   * below average utilized datanodes, and underutilized datanodes. 
-   * The input data node set is shuffled before the datanodes 
-   * are put into the over-utilized datanodes, above average utilized
-   * datanodes, below average utilized datanodes, and
-   * underutilized datanodes lists. This will add some randomness
-   * to the node matching later on.
-   * 
-   * @return the total number of bytes that are 
-   *                needed to move to make the cluster balanced.
-   * @param datanodes a set of datanodes
+  private static long getCapacity(DatanodeStorageReport report, StorageType t) {
+    long capacity = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        capacity += r.getCapacity();
+      }
+    }
+    return capacity;
+  }
+
+  private static long getRemaining(DatanodeStorageReport report, StorageType t) {
+    long remaining = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        remaining += r.getRemaining();
+      }
+    }
+    return remaining;
+  }
+
+  /**
+   * Given a datanode storage set, build a network topology and decide
+   * over-utilized storages, above average utilized storages, 
+   * below average utilized storages, and underutilized storages. 
+   * The input datanode storage set is shuffled in order to randomize
+   * to the storage matching later on.
+   *
+   * @return the number of bytes needed to move in order to balance the cluster.
    */
-  private long initNodes(DatanodeInfo[] datanodes) {
+  private long init(List<DatanodeStorageReport> reports) {
     // compute average utilization
-    for (DatanodeInfo datanode : datanodes) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
-      }
-      policy.accumulateSpaces(datanode);
+    for (DatanodeStorageReport r : reports) {
+      policy.accumulateSpaces(r);
     }
     policy.initAvgUtilization();
 
-    /*create network topology and all data node lists: 
-     * overloaded, above-average, below-average, and underloaded
-     * we alternates the accessing of the given datanodes array either by
-     * an increasing order or a decreasing order.
-     */  
+    // create network topology and classify utilization collections: 
+    //   over-utilized, above-average, below-average and under-utilized.
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
-      }
-      cluster.add(datanode);
-      BalancerDatanode datanodeS;
-      final double avg = policy.getAvgUtilization();
-      if (policy.getUtilization(datanode) >= avg) {
-        datanodeS = new Source(datanode, policy, threshold);
-        if (isAboveAvgUtilized(datanodeS)) {
-          this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
-        } else {
-          assert(isOverUtilized(datanodeS)) :
-            datanodeS.getDisplayName()+ "is not an overUtilized node";
-          this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avg
-              -threshold)*datanodeS.datanode.getCapacity()/100.0);
+    for(DatanodeStorageReport r : reports) {
+      final DDatanode dn = dispatcher.newDatanode(r);
+      for(StorageType t : StorageType.asList()) {
+        final Double utilization = policy.getUtilization(r, t);
+        if (utilization == null) { // datanode does not have such storage type 
+          continue;
         }
-      } else {
-        datanodeS = new BalancerDatanode(datanode, policy, threshold);
-        if ( isBelowOrEqualAvgUtilized(datanodeS)) {
-          this.belowAvgUtilizedDatanodes.add(datanodeS);
+        
+        final long capacity = getCapacity(r, t);
+        final double utilizationDiff = utilization - policy.getAvgUtilization(t);
+        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
+        final long maxSize2Move = computeMaxSize2Move(capacity,
+            getRemaining(r, t), utilizationDiff, threshold);
+
+        final StorageGroup g;
+        if (utilizationDiff > 0) {
+          final Source s = dn.addSource(t, maxSize2Move, dispatcher);
+          if (thresholdDiff <= 0) { // within threshold
+            aboveAvgUtilized.add(s);
+          } else {
+            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overUtilized.add(s);
+          }
+          g = s;
         } else {
-          assert isUnderUtilized(datanodeS) : "isUnderUtilized("
-              + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
-              + ", utilization=" + datanodeS.utilization; 
-          this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avg-threshold-
-              datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+          g = dn.addStorageGroup(t, maxSize2Move);
+          if (thresholdDiff <= 0) { // within threshold
+            belowAvgUtilized.add(g);
+          } else {
+            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underUtilized.add(g);
+          }
         }
+        dispatcher.getStorageGroupMap().put(g);
       }
-      datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
     }
 
-    //logging
-    logNodes();
+    logUtilizationCollections();
     
-    assert (this.datanodeMap.size() == 
-      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
-      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
-      : "Mismatched number of datanodes";
+    Preconditions.checkState(dispatcher.getStorageGroupMap().size()
+        == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+           + belowAvgUtilized.size(),
+        "Mismatched number of storage groups");
     
     // return number of bytes to be moved in order to make the cluster balanced
     return Math.max(overLoadedBytes, underLoadedBytes);
   }
 
-  /* log the over utilized & under utilized nodes */
-  private void logNodes() {
-    logNodes("over-utilized", overUtilizedDatanodes);
-    if (LOG.isTraceEnabled()) {
-      logNodes("above-average", aboveAvgUtilizedDatanodes);
-      logNodes("below-average", belowAvgUtilizedDatanodes);
-    }
-    logNodes("underutilized", underUtilizedDatanodes);
-  }
-
-  private static <T extends BalancerDatanode> void logNodes(
-      String name, Collection<T> nodes) {
-    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  private static long computeMaxSize2Move(final long capacity, final long remaining,
+      final double utilizationDiff, final double threshold) {
+    final double diff = Math.min(threshold, Math.abs(utilizationDiff));
+    long maxSizeToMove = precentage2bytes(diff, capacity);
+    if (utilizationDiff < 0) {
+      maxSizeToMove = Math.min(remaining, maxSizeToMove);
+    }
+    return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
   }
 
-  /** A matcher interface for matching nodes. */
-  private interface Matcher {
-    /** Given the cluster topology, does the left node match the right node? */
-    boolean match(NetworkTopology cluster, Node left,  Node right);
+  private static long precentage2bytes(double precentage, long capacity) {
+    Preconditions.checkArgument(precentage >= 0,
+        "precentage = " + precentage + " < 0");
+    return (long)(precentage * capacity / 100.0);
   }
 
-  /** Match datanodes in the same node group. */
-  static final Matcher SAME_NODE_GROUP = new Matcher() {
-    @Override
-    public boolean match(NetworkTopology cluster, Node left, Node right) {
-      return cluster.isOnSameNodeGroup(left, right);
-    }
-  };
-
-  /** Match datanodes in the same rack. */
-  static final Matcher SAME_RACK = new Matcher() {
-    @Override
-    public boolean match(NetworkTopology cluster, Node left, Node right) {
-      return cluster.isOnSameRack(left, right);
+  /* log the over utilized & under utilized nodes */
+  private void logUtilizationCollections() {
+    logUtilizationCollection("over-utilized", overUtilized);
+    if (LOG.isTraceEnabled()) {
+      logUtilizationCollection("above-average", aboveAvgUtilized);
+      logUtilizationCollection("below-average", belowAvgUtilized);
     }
-  };
+    logUtilizationCollection("underutilized", underUtilized);
+  }
 
-  /** Match any datanode with any other datanode. */
-  static final Matcher ANY_OTHER = new Matcher() {
-    @Override
-    public boolean match(NetworkTopology cluster, Node left, Node right) {
-      return left != right;
-    }
-  };
+  private static <T extends StorageGroup>
+      void logUtilizationCollection(String name, Collection<T> items) {
+    LOG.info(items.size() + " " + name + ": " + items);
+  }
 
   /**
    * Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
-   * Maximum bytes to be moved per node is
-   * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
-   * Return total number of bytes to move in this iteration
+   * Maximum bytes to be moved per storage group is
+   * min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
+   * @return total number of bytes to move in this iteration
    */
-  private long chooseNodes() {
+  private long chooseStorageGroups() {
     // First, match nodes on the same node group if cluster is node group aware
-    if (cluster.isNodeGroupAware()) {
-      chooseNodes(SAME_NODE_GROUP);
+    if (dispatcher.getCluster().isNodeGroupAware()) {
+      chooseStorageGroups(Matcher.SAME_NODE_GROUP);
     }
     
     // Then, match nodes on the same rack
-    chooseNodes(SAME_RACK);
+    chooseStorageGroups(Matcher.SAME_RACK);
     // At last, match all remaining nodes
-    chooseNodes(ANY_OTHER);
+    chooseStorageGroups(Matcher.ANY_OTHER);
     
-    assert (datanodeMap.size() >= sources.size()+targets.size())
-      : "Mismatched number of datanodes (" +
-      datanodeMap.size() + " total, " +
-      sources.size() + " sources, " +
-      targets.size() + " targets)";
-
-    long bytesToMove = 0L;
-    for (Source src : sources) {
-      bytesToMove += src.getScheduledSize();
-    }
-    return bytesToMove;
+    return dispatcher.bytesToMove();
   }
 
   /** Decide all <source, target> pairs according to the matcher. */
-  private void chooseNodes(final Matcher matcher) {
+  private void chooseStorageGroups(final Matcher matcher) {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, underUtilized, matcher);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
 
     /* match each remaining underutilized datanode (target) to 
      * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
   }
 
   /**
@@ -1048,13 +396,13 @@ public class Balancer {
    * datanodes or the candidates are source nodes with (utilization > Avg), and
    * the others are target nodes with (utilization < Avg).
    */
-  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
-      chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+  private <G extends StorageGroup, C extends StorageGroup>
+      void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
           Matcher matcher) {
-    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
-      final D datanode = i.next();
-      for(; chooseForOneDatanode(datanode, candidates, matcher); );
-      if (!datanode.hasSpaceForScheduling()) {
+    for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
+      final G g = i.next();
+      for(; choose4One(g, candidates, matcher); );
+      if (!g.hasSpaceForScheduling()) {
         i.remove();
       }
     }
@@ -1064,18 +412,18 @@ public class Balancer {
    * For the given datanode, choose a candidate and then schedule it.
    * @return true if a candidate is chosen; false if no candidates is chosen.
    */
-  private <C extends BalancerDatanode> boolean chooseForOneDatanode(
-      BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+  private <C extends StorageGroup> boolean choose4One(StorageGroup g,
+      Collection<C> candidates, Matcher matcher) {
     final Iterator<C> i = candidates.iterator();
-    final C chosen = chooseCandidate(dn, i, matcher);
-
+    final C chosen = chooseCandidate(g, i, matcher);
+  
     if (chosen == null) {
       return false;
     }
-    if (dn instanceof Source) {
-      matchSourceWithTargetToMove((Source)dn, chosen);
+    if (g instanceof Source) {
+      matchSourceWithTargetToMove((Source)g, chosen);
     } else {
-      matchSourceWithTargetToMove((Source)chosen, dn);
+      matchSourceWithTargetToMove((Source)chosen, g);
     }
     if (!chosen.hasSpaceForScheduling()) {
       i.remove();
@@ -1083,27 +431,26 @@ public class Balancer {
     return true;
   }
   
-  private void matchSourceWithTargetToMove(
-      Source source, BalancerDatanode target) {
+  private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
     long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
-    NodeTask nodeTask = new NodeTask(target, size);
-    source.addNodeTask(nodeTask);
-    target.incScheduledSize(nodeTask.getSize());
-    sources.add(source);
-    targets.add(target);
+    final Task task = new Task(target, size);
+    source.addTask(task);
+    target.incScheduledSize(task.getSize());
+    dispatcher.add(source, target);
     LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-        +source.datanode.getName() + " to " + target.datanode.getName());
+        + source.getDisplayName() + " to " + target.getDisplayName());
   }
   
   /** Choose a candidate for the given datanode. */
-  private <D extends BalancerDatanode, C extends BalancerDatanode>
-      C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
-    if (dn.hasSpaceForScheduling()) {
+  private <G extends StorageGroup, C extends StorageGroup>
+      C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
+    if (g.hasSpaceForScheduling()) {
       for(; candidates.hasNext(); ) {
         final C c = candidates.next();
         if (!c.hasSpaceForScheduling()) {
           candidates.remove();
-        } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+        } else if (matcher.match(dispatcher.getCluster(),
+            g.getDatanodeInfo(), c.getDatanodeInfo())) {
           return c;
         }
       }
@@ -1111,283 +458,25 @@ public class Balancer {
     return null;
   }
 
-  private static class BytesMoved {
-    private long bytesMoved = 0L;;
-    private synchronized void inc( long bytes ) {
-      bytesMoved += bytes;
-    }
-
-    private synchronized long get() {
-      return bytesMoved;
-    }
-  };
-  private final BytesMoved bytesMoved = new BytesMoved();
-  
-  /* Start a thread to dispatch block moves for each source. 
-   * The thread selects blocks to move & sends request to proxy source to
-   * initiate block move. The process is flow controlled. Block selection is
-   * blocked if there are too many un-confirmed block moves.
-   * Return the total number of bytes successfully moved in this iteration.
-   */
-  private long dispatchBlockMoves() throws InterruptedException {
-    long bytesLastMoved = bytesMoved.get();
-    Future<?>[] futures = new Future<?>[sources.size()];
-    int i=0;
-    for (Source source : sources) {
-      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
-    }
-    
-    // wait for all dispatcher threads to finish
-    for (Future<?> future : futures) {
-      try {
-        future.get();
-      } catch (ExecutionException e) {
-        LOG.warn("Dispatcher thread failed", e.getCause());
-      }
-    }
-    
-    // wait for all block moving to be done
-    waitForMoveCompletion();
-    
-    return bytesMoved.get()-bytesLastMoved;
-  }
-  
-  // The sleeping period before checking if block move is completed again
-  static private long blockMoveWaitTime = 30000L;
-  
-  /** set the sleeping period for block move completion check */
-  static void setBlockMoveWaitTime(long time) {
-    blockMoveWaitTime = time;
-  }
-  
-  /* wait for all block move confirmations 
-   * by checking each target's pendingMove queue 
-   */
-  private void waitForMoveCompletion() {
-    boolean shouldWait;
-    do {
-      shouldWait = false;
-      for (BalancerDatanode target : targets) {
-        if (!target.isPendingQEmpty()) {
-          shouldWait = true;
-        }
-      }
-      if (shouldWait) {
-        try {
-          Thread.sleep(blockMoveWaitTime);
-        } catch (InterruptedException ignored) {
-        }
-      }
-    } while (shouldWait);
-  }
-
-  /** This window makes sure to keep blocks that have been moved within 1.5 hour.
-   * Old window has blocks that are older;
-   * Current window has blocks that are more recent;
-   * Cleanup method triggers the check if blocks in the old window are
-   * more than 1.5 hour old. If yes, purge the old window and then
-   * move blocks in current window to old window.
-   */ 
-  private static class MovedBlocks {
-    private long lastCleanupTime = Time.now();
-    final private static int CUR_WIN = 0;
-    final private static int OLD_WIN = 1;
-    final private static int NUM_WINS = 2;
-    final private List<HashMap<Block, BalancerBlock>> movedBlocks = 
-      new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
-    
-    /* initialize the moved blocks collection */
-    private MovedBlocks() {
-      movedBlocks.add(new HashMap<Block,BalancerBlock>());
-      movedBlocks.add(new HashMap<Block,BalancerBlock>());
-    }
-
-    /* add a block thus marking a block to be moved */
-    synchronized private void add(BalancerBlock block) {
-      movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
-    }
-
-    /* check if a block is marked as moved */
-    synchronized private boolean contains(BalancerBlock block) {
-      return contains(block.getBlock());
-    }
-
-    /* check if a block is marked as moved */
-    synchronized private boolean contains(Block block) {
-      return movedBlocks.get(CUR_WIN).containsKey(block) ||
-        movedBlocks.get(OLD_WIN).containsKey(block);
-    }
-
-    /* remove old blocks */
-    synchronized private void cleanup() {
-      long curTime = Time.now();
-      // check if old win is older than winWidth
-      if (lastCleanupTime + WIN_WIDTH <= curTime) {
-        // purge the old window
-        movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
-        movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
-        lastCleanupTime = curTime;
-      }
-    }
-  }
-
-  /* Decide if it is OK to move the given block from source to target
-   * A block is a good candidate if
-   * 1. the block is not in the process of being moved/has not been moved;
-   * 2. the block does not have a replica on the target;
-   * 3. doing the move does not reduce the number of racks that the block has
-   */
-  private boolean isGoodBlockCandidate(Source source, 
-      BalancerDatanode target, BalancerBlock block) {
-    // check if the block is moved or not
-    if (movedBlocks.contains(block)) {
-        return false;
-    }
-    if (block.isLocatedOnDatanode(target)) {
-      return false;
-    }
-    if (cluster.isNodeGroupAware() && 
-        isOnSameNodeGroupWithReplicas(target, block, source)) {
-      return false;
-    }
-
-    boolean goodBlock = false;
-    if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
-      // good if source and target are on the same rack
-      goodBlock = true;
-    } else {
-      boolean notOnSameRack = true;
-      synchronized (block) {
-        for (BalancerDatanode loc : block.locations) {
-          if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
-            notOnSameRack = false;
-            break;
-          }
-        }
-      }
-      if (notOnSameRack) {
-        // good if target is target is not on the same rack as any replica
-        goodBlock = true;
-      } else {
-        // good if source is on the same rack as on of the replicas
-        for (BalancerDatanode loc : block.locations) {
-          if (loc != source && 
-              cluster.isOnSameRack(loc.datanode, source.datanode)) {
-            goodBlock = true;
-            break;
-          }
-        }
-      }
-    }
-    return goodBlock;
-  }
-
-  /**
-   * Check if there are any replica (other than source) on the same node group
-   * with target. If true, then target is not a good candidate for placing 
-   * specific block replica as we don't want 2 replicas under the same nodegroup 
-   * after balance.
-   * @param target targetDataNode
-   * @param block dataBlock
-   * @param source sourceDataNode
-   * @return true if there are any replica (other than source) on the same node
-   * group with target
-   */
-  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
-      BalancerBlock block, Source source) {
-    for (BalancerDatanode loc : block.locations) {
-      if (loc != source && 
-        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
-          return true;
-        }
-      }
-    return false;
-  }
-
   /* reset all fields in a balancer preparing for the next iteration */
   private void resetData(Configuration conf) {
-    this.cluster = NetworkTopology.getInstance(conf);
-    this.overUtilizedDatanodes.clear();
-    this.aboveAvgUtilizedDatanodes.clear();
-    this.belowAvgUtilizedDatanodes.clear();
-    this.underUtilizedDatanodes.clear();
-    this.datanodeMap.clear();
-    this.sources.clear();
-    this.targets.clear();  
+    this.overUtilized.clear();
+    this.aboveAvgUtilized.clear();
+    this.belowAvgUtilized.clear();
+    this.underUtilized.clear();
     this.policy.reset();
-    cleanGlobalBlockList();
-    this.movedBlocks.cleanup();
-  }
-  
-  /* Remove all blocks from the global block list except for the ones in the
-   * moved list.
-   */
-  private void cleanGlobalBlockList() {
-    for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
-    globalBlockListIterator.hasNext();) {
-      Block block = globalBlockListIterator.next();
-      if(!movedBlocks.contains(block)) {
-        globalBlockListIterator.remove();
-      }
-    }
-  }
-  
-  /* Return true if the given datanode is overUtilized */
-  private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (policy.getAvgUtilization()+threshold);
-  }
-  
-  /* Return true if the given datanode is above or equal to average utilized
-   * but not overUtilized */
-  private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization <= (avg+threshold))
-        && (datanode.utilization >= avg);
+    dispatcher.reset(conf);;
   }
   
-  /* Return true if the given datanode is underUtilized */
-  private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (policy.getAvgUtilization()-threshold);
-  }
-
-  /* Return true if the given datanode is below average utilized 
-   * but not underUtilized */
-  private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization >= (avg-threshold))
-             && (datanode.utilization <= avg);
-  }
-
-  // Exit status
-  enum ReturnStatus {
-    // These int values will map directly to the balancer process's exit code.
-    SUCCESS(0),
-    IN_PROGRESS(1),
-    ALREADY_RUNNING(-1),
-    NO_MOVE_BLOCK(-2),
-    NO_MOVE_PROGRESS(-3),
-    IO_EXCEPTION(-4),
-    ILLEGAL_ARGS(-5),
-    INTERRUPTED(-6);
-
-    final int code;
-
-    ReturnStatus(int code) {
-      this.code = code;
-    }
-  }
-
   /** Run an iteration for all datanodes. */
-  private ReturnStatus run(int iteration, Formatter formatter,
+  private ExitStatus run(int iteration, Formatter formatter,
       Configuration conf) {
     try {
-      /* get all live datanodes of a cluster and their disk usage
-       * decide the number of bytes need to be moved
-       */
-      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+      final List<DatanodeStorageReport> reports = dispatcher.init();
+      final long bytesLeftToMove = init(reports);
       if (bytesLeftToMove == 0) {
         System.out.println("The cluster is balanced. Exiting...");
-        return ReturnStatus.SUCCESS;
+        return ExitStatus.SUCCESS;
       } else {
         LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
             + " to make the cluster balanced." );
@@ -1398,10 +487,10 @@ public class Balancer {
        * in this iteration. Maximum bytes to be moved per node is
        * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
        */
-      final long bytesToMove = chooseNodes();
+      final long bytesToMove = chooseStorageGroups();
       if (bytesToMove == 0) {
         System.out.println("No block can be moved. Exiting...");
-        return ReturnStatus.NO_MOVE_BLOCK;
+        return ExitStatus.NO_MOVE_BLOCK;
       } else {
         LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
             " in this iteration");
@@ -1410,7 +499,7 @@ public class Balancer {
       formatter.format("%-24s %10d  %19s  %18s  %17s%n",
           DateFormat.getDateTimeInstance().format(new Date()),
           iteration,
-          StringUtils.byteDesc(bytesMoved.get()),
+          StringUtils.byteDesc(dispatcher.getBytesMoved()),
           StringUtils.byteDesc(bytesLeftToMove),
           StringUtils.byteDesc(bytesToMove)
           );
@@ -1421,24 +510,22 @@ public class Balancer {
        * available to move.
        * Exit no byte has been moved for 5 consecutive iterations.
        */
-      if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
-        return ReturnStatus.NO_MOVE_PROGRESS;
+      if (!dispatcher.dispatchAndCheckContinue()) {
+        return ExitStatus.NO_MOVE_PROGRESS;
       }
 
-      return ReturnStatus.IN_PROGRESS;
+      return ExitStatus.IN_PROGRESS;
     } catch (IllegalArgumentException e) {
       System.out.println(e + ".  Exiting ...");
-      return ReturnStatus.ILLEGAL_ARGS;
+      return ExitStatus.ILLEGAL_ARGUMENTS;
     } catch (IOException e) {
       System.out.println(e + ".  Exiting ...");
-      return ReturnStatus.IO_EXCEPTION;
+      return ExitStatus.IO_EXCEPTION;
     } catch (InterruptedException e) {
       System.out.println(e + ".  Exiting ...");
-      return ReturnStatus.INTERRUPTED;
+      return ExitStatus.INTERRUPTED;
     } finally {
-      // shutdown thread pools
-      dispatcherExecutor.shutdownNow();
-      moverExecutor.shutdownNow();
+      dispatcher.shutdownNow();
     }
   }
 
@@ -1453,8 +540,8 @@ public class Balancer {
     final long sleeptime = 2000*conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
-    LOG.info("namenodes = " + namenodes);
-    LOG.info("p         = " + p);
+    LOG.info("namenodes  = " + namenodes);
+    LOG.info("parameters = " + p);
     
     final Formatter formatter = new Formatter(System.out);
     System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
@@ -1463,7 +550,10 @@ public class Balancer {
         = new ArrayList<NameNodeConnector>(namenodes.size());
     try {
       for (URI uri : namenodes) {
-        connectors.add(new NameNodeConnector(uri, conf));
+        final NameNodeConnector nnc = new NameNodeConnector(
+            Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
+        nnc.getKeyManager().startBlockKeyUpdater();
+        connectors.add(nnc);
       }
     
       boolean done = false;
@@ -1472,14 +562,14 @@ public class Balancer {
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
-          final ReturnStatus r = b.run(iteration, formatter, conf);
+          final ExitStatus r = b.run(iteration, formatter, conf);
           // clean all lists
           b.resetData(conf);
-          if (r == ReturnStatus.IN_PROGRESS) {
+          if (r == ExitStatus.IN_PROGRESS) {
             done = false;
-          } else if (r != ReturnStatus.SUCCESS) {
+          } else if (r != ExitStatus.SUCCESS) {
             //must be an error statue, return.
-            return r.code;
+            return r.getExitCode();
           }
         }
 
@@ -1492,7 +582,7 @@ public class Balancer {
         nnc.close();
       }
     }
-    return ReturnStatus.SUCCESS.code;
+    return ExitStatus.SUCCESS.getExitCode();
   }
 
   /* Given elaspedTime in ms, return a printable string */
@@ -1516,21 +606,31 @@ public class Balancer {
   }
 
   static class Parameters {
-    static final Parameters DEFALUT = new Parameters(
-        BalancingPolicy.Node.INSTANCE, 10.0);
+    static final Parameters DEFAULT = new Parameters(
+        BalancingPolicy.Node.INSTANCE, 10.0,
+        Collections.<String> emptySet(), Collections.<String> emptySet());
 
     final BalancingPolicy policy;
     final double threshold;
+    // exclude the nodes in this set from balancing operations
+    Set<String> nodesToBeExcluded;
+    //include only these nodes in balancing operations
+    Set<String> nodesToBeIncluded;
 
-    Parameters(BalancingPolicy policy, double threshold) {
+    Parameters(BalancingPolicy policy, double threshold,
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
       this.policy = policy;
       this.threshold = threshold;
+      this.nodesToBeExcluded = nodesToBeExcluded;
+      this.nodesToBeIncluded = nodesToBeIncluded;
     }
 
     @Override
     public String toString() {
       return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold + "]";
+          + "[" + policy + ", threshold=" + threshold +
+          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
     }
   }
 
@@ -1545,9 +645,6 @@ public class Balancer {
     public int run(String[] args) {
       final long startTime = Time.now();
       final Configuration conf = getConf();
-      WIN_WIDTH = conf.getLong(
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
 
       try {
         checkReplicationPolicyCompatibility(conf);
@@ -1556,26 +653,29 @@ public class Balancer {
         return Balancer.run(namenodes, parse(args), conf);
       } catch (IOException e) {
         System.out.println(e + ".  Exiting ...");
-        return ReturnStatus.IO_EXCEPTION.code;
+        return ExitStatus.IO_EXCEPTION.getExitCode();
       } catch (InterruptedException e) {
         System.out.println(e + ".  Exiting ...");
-        return ReturnStatus.INTERRUPTED.code;
+        return ExitStatus.INTERRUPTED.getExitCode();
       } finally {
+        System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
         System.out.println("Balancing took " + time2Str(Time.now()-startTime));
       }
     }
 
     /** parse command line arguments */
     static Parameters parse(String[] args) {
-      BalancingPolicy policy = Parameters.DEFALUT.policy;
-      double threshold = Parameters.DEFALUT.threshold;
+      BalancingPolicy policy = Parameters.DEFAULT.policy;
+      double threshold = Parameters.DEFAULT.threshold;
+      Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+      Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
 
       if (args != null) {
         try {
           for(int i = 0; i < args.length; i++) {
-            checkArgument(args.length >= 2, "args = " + Arrays.toString(args));
             if ("-threshold".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "Threshold value is missing: args = " + Arrays.toString(args));
               try {
                 threshold = Double.parseDouble(args[i]);
                 if (threshold < 1 || threshold > 100) {
@@ -1590,25 +690,52 @@ public class Balancer {
                 throw e;
               }
             } else if ("-policy".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "Policy value is missing: args = " + Arrays.toString(args));
               try {
                 policy = BalancingPolicy.parse(args[i]);
               } catch(IllegalArgumentException e) {
                 System.err.println("Illegal policy name: " + args[i]);
                 throw e;
               }
+            } else if ("-exclude".equalsIgnoreCase(args[i])) {
+              checkArgument(++i < args.length,
+                  "List of nodes to exclude | -f <filename> is missing: args = "
+                  + Arrays.toString(args));
+              if ("-f".equalsIgnoreCase(args[i])) {
+                checkArgument(++i < args.length,
+                    "File containing nodes to exclude is not specified: args = "
+                    + Arrays.toString(args));
+                nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
+              } else {
+                nodesTobeExcluded = Util.parseHostList(args[i]);
+              }
+            } else if ("-include".equalsIgnoreCase(args[i])) {
+              checkArgument(++i < args.length,
+                "List of nodes to include | -f <filename> is missing: args = "
+                + Arrays.toString(args));
+              if ("-f".equalsIgnoreCase(args[i])) {
+                checkArgument(++i < args.length,
+                    "File containing nodes to include is not specified: args = "
+                    + Arrays.toString(args));
+                nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
+               } else {
+                nodesTobeIncluded = Util.parseHostList(args[i]);
+              }
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
             }
           }
+          checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(),
+              "-exclude and -include options cannot be specified together.");
         } catch(RuntimeException e) {
           printUsage(System.err);
           throw e;
         }
       }
       
-      return new Parameters(policy, threshold);
+      return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
     }
 
     private static void printUsage(PrintStream out) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
 
 /**
  * Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D
  */
 @InterfaceAudience.Private
 abstract class BalancingPolicy {
-  long totalCapacity;
-  long totalUsedSpace;
-  private double avgUtilization;
+  final EnumCounters<StorageType> totalCapacities
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumCounters<StorageType> totalUsedSpaces
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumDoubles<StorageType> avgUtilizations
+      = new EnumDoubles<StorageType>(StorageType.class);
 
   void reset() {
-    totalCapacity = 0L;
-    totalUsedSpace = 0L;
-    avgUtilization = 0.0;
+    totalCapacities.reset();
+    totalUsedSpaces.reset();
+    avgUtilizations.reset();
   }
 
   /** Get the policy name. */
   abstract String getName();
 
   /** Accumulate used space and capacity. */
-  abstract void accumulateSpaces(DatanodeInfo d);
+  abstract void accumulateSpaces(DatanodeStorageReport r);
 
   void initAvgUtilization() {
-    this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+    for(StorageType t : StorageType.asList()) {
+      final long capacity = totalCapacities.get(t);
+      if (capacity > 0L) {
+        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
+        avgUtilizations.set(t, avg);
+      }
+    }
   }
-  double getAvgUtilization() {
-    return avgUtilization;
+
+  double getAvgUtilization(StorageType t) {
+    return avgUtilizations.get(t);
   }
 
-  /** Return the utilization of a datanode */
-  abstract double getUtilization(DatanodeInfo d);
+  /** @return the utilization of a particular storage type of a datanode;
+   *          or return null if the datanode does not have such storage type.
+   */
+  abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
   
   @Override
   public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getDfsUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getDfsUsed());
+      }
     }
     
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getDfsUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long dfsUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          dfsUsed += s.getDfsUsed();
+        }
+      }
+      return capacity == 0L? null: dfsUsed*100.0/capacity;
     }
   }
 
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getBlockPoolUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getBlockPoolUsed());
+      }
     }
 
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getBlockPoolUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long blockPoolUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          blockPoolUsed += s.getBlockPoolUsed();
+        }
+      }
+      return capacity == 0L? null: blockPoolUsed*100.0/capacity;
     }
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Tue Aug 19 23:49:39 2014
@@ -17,115 +17,102 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
-import java.util.EnumSet;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
 
 /**
- * The class provides utilities for {@link Balancer} to access a NameNode
+ * The class provides utilities for accessing a NameNode.
  */
 @InterfaceAudience.Private
-class NameNodeConnector {
-  private static final Log LOG = Balancer.LOG;
-  private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+public class NameNodeConnector implements Closeable {
+  private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
+
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
 
-  final URI nameNodeUri;
-  final String blockpoolID;
+  private final URI nameNodeUri;
+  private final String blockpoolID;
+
+  private final NamenodeProtocol namenode;
+  private final ClientProtocol client;
+  private final KeyManager keyManager;
+
+  private final FileSystem fs;
+  private final Path idPath;
+  private final OutputStream out;
 
-  final NamenodeProtocol namenode;
-  final ClientProtocol client;
-  final FileSystem fs;
-  final OutputStream out;
-
-  private final boolean isBlockTokenEnabled;
-  private final boolean encryptDataTransfer;
-  private boolean shouldRun;
-  private long keyUpdaterInterval;
-  // used for balancer
   private int notChangedIterations = 0;
-  private BlockTokenSecretManager blockTokenSecretManager;
-  private Daemon keyupdaterthread; // AccessKeyUpdater thread
-  private DataEncryptionKey encryptionKey;
-  private final TrustedChannelResolver trustedChannelResolver;
 
-  NameNodeConnector(URI nameNodeUri,
+  public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
       Configuration conf) throws IOException {
     this.nameNodeUri = nameNodeUri;
+    this.idPath = idPath;
     
-    this.namenode =
-      NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
-        .getProxy();
-    this.client =
-      NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
-        .getProxy();
+    this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
+        NamenodeProtocol.class).getProxy();
+    this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
+        ClientProtocol.class).getProxy();
     this.fs = FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
     this.blockpoolID = namespaceinfo.getBlockPoolID();
 
-    final ExportedBlockKeys keys = namenode.getBlockKeys();
-    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
-    if (isBlockTokenEnabled) {
-      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long blockTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Block token params received from NN: keyUpdateInterval="
-          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + blockTokenLifetime / (60 * 1000) + " min(s)");
-      String encryptionAlgorithm = conf.get(
-          DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
-      this.blockTokenSecretManager = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
-          encryptionAlgorithm);
-      this.blockTokenSecretManager.addKeys(keys);
-      /*
-       * Balancer should sync its block keys with NN more frequently than NN
-       * updates its block keys
-       */
-      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its block keys every "
-          + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
-      this.shouldRun = true;
-      this.keyupdaterthread.start();
-    }
-    this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
-        .getEncryptDataTransfer();
-    // Check if there is another balancer running.
+    final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
+    this.keyManager = new KeyManager(blockpoolID, namenode,
+        defaults.getEncryptDataTransfer(), conf);
     // Exit if there is another one running.
-    out = checkAndMarkRunningBalancer(); 
+    out = checkAndMarkRunning(); 
     if (out == null) {
-      throw new IOException("Another balancer is running");
+      throw new IOException("Another " + name + " is running.");
     }
-    this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
   }
 
-  boolean shouldContinue(long dispatchBlockMoveBytes) {
+  /** @return the block pool ID */
+  public String getBlockpoolID() {
+    return blockpoolID;
+  }
+
+  /** @return blocks with locations. */
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+      throws IOException {
+    return namenode.getBlocks(datanode, size);
+  }
+
+  /** @return live datanode storage reports. */
+  public DatanodeStorageReport[] getLiveDatanodeStorageReport()
+      throws IOException {
+    return client.getDatanodeStorageReport(DatanodeReportType.LIVE);
+  }
+
+  /** @return the key manager */
+  public KeyManager getKeyManager() {
+    return keyManager;
+  }
+
+  /** Should the instance continue running? */
+  public boolean shouldContinue(long dispatchBlockMoveBytes) {
     if (dispatchBlockMoveBytes > 0) {
       notChangedIterations = 0;
     } else {
@@ -139,53 +126,25 @@ class NameNodeConnector {
     return true;
   }
   
-  /** Get an access token for a block. */
-  Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
-      ) throws IOException {
-    if (!isBlockTokenEnabled) {
-      return BlockTokenSecretManager.DUMMY_TOKEN;
-    } else {
-      if (!shouldRun) {
-        throw new IOException(
-            "Can not get access token. BlockKeyUpdater is not running");
-      }
-      return blockTokenSecretManager.generateToken(null, eb,
-          EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
-          BlockTokenSecretManager.AccessMode.COPY));
-    }
-  }
-  
-  DataEncryptionKey getDataEncryptionKey()
-      throws IOException {
-    if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
-      synchronized (this) {
-        if (encryptionKey == null) {
-          encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
-        }
-        return encryptionKey;
-      }
-    } else {
-      return null;
-    }
-  }
 
-  /* The idea for making sure that there is no more than one balancer
-   * running in an HDFS is to create a file in the HDFS, writes the IP address
-   * of the machine on which the balancer is running to the file, but did not
-   * close the file until the balancer exits. 
-   * This prevents the second balancer from running because it can not
+  /**
+   * The idea for making sure that there is no more than one instance
+   * running in an HDFS is to create a file in the HDFS, writes the hostname
+   * of the machine on which the instance is running to the file, but did not
+   * close the file until it exits. 
+   * 
+   * This prevents the second instance from running because it can not
    * creates the file while the first one is running.
    * 
-   * This method checks if there is any running balancer and 
-   * if no, mark yes if no.
+   * This method checks if there is any running instance. If no, mark yes.
    * Note that this is an atomic operation.
    * 
-   * Return null if there is a running balancer; otherwise the output stream
-   * to the newly created file.
+   * @return null if there is a running instance;
+   *         otherwise, the output stream to the newly created file.
    */
-  private OutputStream checkAndMarkRunningBalancer() throws IOException {
+  private OutputStream checkAndMarkRunning() throws IOException {
     try {
-      final DataOutputStream out = fs.create(BALANCER_ID_PATH);
+      final DataOutputStream out = fs.create(idPath);
       out.writeBytes(InetAddress.getLocalHost().getHostName());
       out.flush();
       return out;
@@ -198,24 +157,17 @@ class NameNodeConnector {
     }
   }
 
-  /** Close the connection. */
-  void close() {
-    shouldRun = false;
-    try {
-      if (keyupdaterthread != null) {
-        keyupdaterthread.interrupt();
-      }
-    } catch(Exception e) {
-      LOG.warn("Exception shutting down access key updater thread", e);
-    }
+  @Override
+  public void close() {
+    keyManager.close();
 
     // close the output file
     IOUtils.closeStream(out); 
     if (fs != null) {
       try {
-        fs.delete(BALANCER_ID_PATH, true);
+        fs.delete(idPath, true);
       } catch(IOException ioe) {
-        LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe);
+        LOG.warn("Failed to delete " + idPath, ioe);
       }
     }
   }
@@ -223,31 +175,6 @@ class NameNodeConnector {
   @Override
   public String toString() {
     return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
-        + ", id=" + blockpoolID
-        + "]";
-  }
-
-  /**
-   * Periodically updates access keys.
-   */
-  class BlockKeyUpdater implements Runnable {
-    @Override
-    public void run() {
-      try {
-        while (shouldRun) {
-          try {
-            blockTokenSecretManager.addKeys(namenode.getBlockKeys());
-          } catch (IOException e) {
-            LOG.error("Failed to set keys", e);
-          }
-          Thread.sleep(keyUpdaterInterval);
-        }
-      } catch (InterruptedException e) {
-        LOG.debug("InterruptedException in block key updater thread", e);
-      } catch (Throwable e) {
-        LOG.error("Exception in block key updater thread", e);
-        shouldRun = false;
-      }
-    }
+        + ", bpid=" + blockpoolID + "]";
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Aug 19 23:49:39 2014
@@ -21,7 +21,6 @@ import java.util.LinkedList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.util.LightWeightGSet;
 
@@ -195,24 +194,12 @@ public class BlockInfo extends Block imp
    * Add a {@link DatanodeStorageInfo} location for a block
    */
   boolean addStorage(DatanodeStorageInfo storage) {
-    boolean added = true;
-    int idx = findDatanode(storage.getDatanodeDescriptor());
-    if(idx >= 0) {
-      if (getStorageInfo(idx) == storage) { // the storage is already there
-        return false;
-      } else {
-        // The block is on the DN but belongs to a different storage.
-        // Update our state.
-        removeStorage(storage);
-        added = false;      // Just updating storage. Return false.
-      }
-    }
     // find the last null node
     int lastNode = ensureCapacity(1);
     setStorageInfo(lastNode, storage);
     setNext(lastNode, null);
     setPrevious(lastNode, null);
-    return added;
+    return true;
   }
 
   /**
@@ -239,40 +226,39 @@ public class BlockInfo extends Block imp
 
   /**
    * Find specified DatanodeDescriptor.
-   * @param dn
    * @return index or -1 if not found.
    */
-  int findDatanode(DatanodeDescriptor dn) {
+  boolean findDatanode(DatanodeDescriptor dn) {
     int len = getCapacity();
     for(int idx = 0; idx < len; idx++) {
       DatanodeDescriptor cur = getDatanode(idx);
-      if(cur == dn)
-        return idx;
-      if(cur == null)
+      if(cur == dn) {
+        return true;
+      }
+      if(cur == null) {
         break;
+      }
     }
-    return -1;
+    return false;
   }
   /**
    * Find specified DatanodeStorageInfo.
-   * @param dn
-   * @return index or -1 if not found.
+   * @return DatanodeStorageInfo or null if not found.
    */
-  int findStorageInfo(DatanodeInfo dn) {
+  DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
     int len = getCapacity();
     for(int idx = 0; idx < len; idx++) {
       DatanodeStorageInfo cur = getStorageInfo(idx);
       if(cur == null)
         break;
       if(cur.getDatanodeDescriptor() == dn)
-        return idx;
+        return cur;
     }
-    return -1;
+    return null;
   }
   
   /**
    * Find specified DatanodeStorageInfo.
-   * @param storageInfo
    * @return index or -1 if not found.
    */
   int findStorageInfo(DatanodeStorageInfo storageInfo) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Tue Aug 19 23:49:39 2014
@@ -373,12 +373,14 @@ public class BlockInfoUnderConstruction 
     sb.append("{blockUCState=").append(blockUCState)
       .append(", primaryNodeIndex=").append(primaryNodeIndex)
       .append(", replicas=[");
-    Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
-    if (iter.hasNext()) {
-      iter.next().appendStringTo(sb);
-      while (iter.hasNext()) {
-        sb.append(", ");
+    if (replicas != null) {
+      Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
+      if (iter.hasNext()) {
         iter.next().appendStringTo(sb);
+        while (iter.hasNext()) {
+          sb.append(", ");
+          iter.next().appendStringTo(sb);
+        }
       }
     }
     sb.append("]}");