You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/08/01 03:12:25 UTC

svn commit: r1615016 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs/server/balancer/ s...

Author: szetszwo
Date: Fri Aug  1 01:12:24 2014
New Revision: 1615016

URL: http://svn.apache.org/r1615016
Log:
svn merge -c 1615015 from trunk for HDFS-6685. Balancer should preserve storage type of replicas.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java
      - copied unchanged from r1615015, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1615015

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug  1 01:12:24 2014
@@ -83,6 +83,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-6441. Add ability to exclude/include specific datanodes while
     balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
 
+    HDFS-6685. Balancer should preserve storage type of replicas.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1615015

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Fri Aug  1 01:12:24 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -32,4 +35,11 @@ public enum StorageType {
   SSD;
 
   public static final StorageType DEFAULT = DISK;
+  public static final StorageType[] EMPTY_ARRAY = {};
+  
+  private static final StorageType[] VALUES = values();
+  
+  public static List<StorageType> asList() {
+    return Arrays.asList(VALUES);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Aug  1 01:12:24 2014
@@ -351,15 +351,19 @@ public class PBHelper {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
-        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
+        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
+        .build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
     final List<String> datanodeUuids = b.getDatanodeUuidsList();
     final List<String> storageUuids = b.getStorageUuidsList();
+    final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
     return new BlockWithLocations(convert(b.getBlock()),
         datanodeUuids.toArray(new String[datanodeUuids.size()]),
-        storageUuids.toArray(new String[storageUuids.size()]));
+        storageUuids.toArray(new String[storageUuids.size()]),
+        convertStorageTypes(storageTypes, storageUuids.size()));
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug  1 01:12:24 2014
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -79,6 +80,8 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -91,6 +94,8 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.base.Preconditions;
+
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
  * The tool is deployed as an application program that can be run by the 
@@ -191,7 +196,9 @@ import static com.google.common.base.Pre
 @InterfaceAudience.Private
 public class Balancer {
   static final Log LOG = LogFactory.getLog(Balancer.class);
-  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+  final private static long GB = 1L << 30; //1GB
+  final private static long MAX_SIZE_TO_MOVE = 10*GB;
+  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
   private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
 
   /** The maximum number of concurrent blocks moves for 
@@ -222,26 +229,22 @@ public class Balancer {
   Set<String> nodesToBeIncluded;
   
   // all data node lists
-  private final Collection<Source> overUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<Source> aboveAvgUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  private final Collection<BalancerDatanode> underUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  
-  private final Collection<Source> sources
-                               = new HashSet<Source>();
-  private final Collection<BalancerDatanode> targets
-                               = new HashSet<BalancerDatanode>();
+  private final Collection<Source> overUtilized = new LinkedList<Source>();
+  private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized
+      = new LinkedList<BalancerDatanode.StorageGroup>();
+  private final Collection<BalancerDatanode.StorageGroup> underUtilized
+      = new LinkedList<BalancerDatanode.StorageGroup>();
+  
+  private final Collection<Source> sources = new HashSet<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> targets
+      = new HashSet<BalancerDatanode.StorageGroup>();
   
   private final Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
   private final MovedBlocks movedBlocks = new MovedBlocks();
-  /** Map (datanodeUuid -> BalancerDatanodes) */
-  private final Map<String, BalancerDatanode> datanodeMap
-      = new HashMap<String, BalancerDatanode>();
+  /** Map (datanodeUuid,storageType -> StorageGroup) */
+  private final StorageGroupMap storageGroupMap = new StorageGroupMap();
   
   private NetworkTopology cluster;
 
@@ -249,12 +252,39 @@ public class Balancer {
   private final ExecutorService dispatcherExecutor;
   private final int maxConcurrentMovesPerNode;
 
+
+  private static class StorageGroupMap {
+    private static String toKey(String datanodeUuid, StorageType storageType) {
+      return datanodeUuid + ":" + storageType;
+    }
+
+    private final Map<String, BalancerDatanode.StorageGroup> map
+        = new HashMap<String, BalancerDatanode.StorageGroup>();
+    
+    BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) {
+      return map.get(toKey(datanodeUuid, storageType));
+    }
+    
+    void put(BalancerDatanode.StorageGroup g) {
+      final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
+      final BalancerDatanode.StorageGroup existing = map.put(key, g);
+      Preconditions.checkState(existing == null);
+    }
+    
+    int size() {
+      return map.size();
+    }
+    
+    void clear() {
+      map.clear();
+    }
+  }
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
     private Source source;
     private BalancerDatanode proxySource;
-    private BalancerDatanode target;
+    private BalancerDatanode.StorageGroup target;
     
     /** constructor */
     private PendingBlockMove() {
@@ -265,7 +295,7 @@ public class Balancer {
       final Block b = block.getBlock();
       return b + " with size=" + b.getNumBytes() + " from "
           + source.getDisplayName() + " to " + target.getDisplayName()
-          + " through " + proxySource.getDisplayName();
+          + " through " + proxySource.datanode;
     }
 
     /* choose a block & a proxy source for this pendingMove 
@@ -317,20 +347,20 @@ public class Balancer {
       final DatanodeInfo targetDN = target.getDatanode();
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
-        for (BalancerDatanode loc : block.getLocations()) {
+        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
           if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
             return true;
           }
         }
       }
       // check if there is replica which is on the same rack with the target
-      for (BalancerDatanode loc : block.getLocations()) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
         if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
           return true;
         }
       }
       // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
         if (addTo(loc)) {
           return true;
         }
@@ -338,8 +368,9 @@ public class Balancer {
       return false;
     }
     
-    // add a BalancerDatanode as proxy source for specific block movement
-    private boolean addTo(BalancerDatanode bdn) {
+    /** add to a proxy source for specific block movement */
+    private boolean addTo(BalancerDatanode.StorageGroup g) {
+      final BalancerDatanode bdn = g.getBalancerDatanode();
       if (bdn.addPendingBlock(this)) {
         proxySource = bdn;
         return true;
@@ -355,7 +386,7 @@ public class Balancer {
       DataInputStream in = null;
       try {
         sock.connect(
-            NetUtils.createSocketAddr(target.datanode.getXferAddr()),
+            NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
         /* Unfortunately we don't have a good way to know if the Datanode is
          * taking a really long time to move a block, OR something has
@@ -372,7 +403,7 @@ public class Balancer {
         ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
         Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, nnc, accessToken, target.datanode);
+          unbufIn, nnc, accessToken, target.getDatanode());
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -392,14 +423,14 @@ public class Balancer {
          * gets out of sync with work going on in datanode.
          */
         proxySource.activateDelay(DELAY_AFTER_ERROR);
-        target.activateDelay(DELAY_AFTER_ERROR);
+        target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
         
         proxySource.removePendingBlock(this);
-        target.removePendingBlock(this);
+        target.getBalancerDatanode().removePendingBlock(this);
 
         synchronized (this ) {
           reset();
@@ -415,7 +446,7 @@ public class Balancer {
         StorageType storageType, 
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, storageType, accessToken,
-          source.getStorageID(), proxySource.getDatanode());
+          source.getDatanode().getDatanodeUuid(), proxySource.datanode);
     }
     
     /* Receive a block copy response from the input stream */ 
@@ -455,8 +486,9 @@ public class Balancer {
   /* A class for keeping track of blocks in the Balancer */
   static private class BalancerBlock {
     private final Block block; // the block
-    private final List<BalancerDatanode> locations
-            = new ArrayList<BalancerDatanode>(3); // its locations
+    /** The locations of the replicas of the block. */
+    private final List<BalancerDatanode.StorageGroup> locations
+        = new ArrayList<BalancerDatanode.StorageGroup>(3);
     
     /* Constructor */
     private BalancerBlock(Block block) {
@@ -469,20 +501,19 @@ public class Balancer {
     }
     
     /* add a location */
-    private synchronized void addLocation(BalancerDatanode datanode) {
-      if (!locations.contains(datanode)) {
-        locations.add(datanode);
+    private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
+      if (!locations.contains(g)) {
+        locations.add(g);
       }
     }
     
-    /* Return if the block is located on <code>datanode</code> */
-    private synchronized boolean isLocatedOnDatanode(
-        BalancerDatanode datanode) {
-      return locations.contains(datanode);
+    /** @return if the block is located on the given storage group. */
+    private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
+      return locations.contains(g);
     }
     
     /* Return its locations */
-    private synchronized List<BalancerDatanode> getLocations() {
+    private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
       return locations;
     }
     
@@ -499,37 +530,84 @@ public class Balancer {
   
   /* The class represents a desired move of bytes between two nodes 
    * and the target.
-   * An object of this class is stored in a source node. 
+   * An object of this class is stored in a source. 
    */
-  static private class NodeTask {
-    private final BalancerDatanode datanode; //target node
+  static private class Task {
+    private final BalancerDatanode.StorageGroup target;
     private long size;  //bytes scheduled to move
     
     /* constructor */
-    private NodeTask(BalancerDatanode datanode, long size) {
-      this.datanode = datanode;
+    private Task(BalancerDatanode.StorageGroup target, long size) {
+      this.target = target;
       this.size = size;
     }
-    
-    /* Get the node */
-    private BalancerDatanode getDatanode() {
-      return datanode;
-    }
-    
-    /* Get the number of bytes that need to be moved */
-    private long getSize() {
-      return size;
-    }
   }
   
   
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode {
-    final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+    
+    /** A group of storages in a datanode with the same storage type. */
+    private class StorageGroup {
+      final StorageType storageType;
+      final double utilization;
+      final long maxSize2Move;
+      private long scheduledSize = 0L;
+
+      private StorageGroup(StorageType storageType, double utilization,
+          long maxSize2Move) {
+        this.storageType = storageType;
+        this.utilization = utilization;
+        this.maxSize2Move = maxSize2Move;
+      }
+      
+      BalancerDatanode getBalancerDatanode() {
+        return BalancerDatanode.this;
+      }
+
+      DatanodeInfo getDatanode() {
+        return BalancerDatanode.this.datanode;
+      }
+
+      /** Decide if still need to move more bytes */
+      protected synchronized boolean hasSpaceForScheduling() {
+        return availableSizeToMove() > 0L;
+      }
+
+      /** @return the total number of bytes that need to be moved */
+      synchronized long availableSizeToMove() {
+        return maxSize2Move - scheduledSize;
+      }
+      
+      /** increment scheduled size */
+      synchronized void incScheduledSize(long size) {
+        scheduledSize += size;
+      }
+      
+      /** @return scheduled size */
+      synchronized long getScheduledSize() {
+        return scheduledSize;
+      }
+      
+      /** Reset scheduled size to zero. */
+      synchronized void resetScheduledSize() {
+        scheduledSize = 0L;
+      }
+
+      /** @return the name for display */
+      String getDisplayName() {
+        return datanode + ":" + storageType;
+      }
+      
+      @Override
+      public String toString() {
+        return "" + utilization;
+      }
+    }
+
     final DatanodeInfo datanode;
-    final double utilization;
-    final long maxSize2Move;
-    private long scheduledSize = 0L;
+    final EnumMap<StorageType, StorageGroup> storageMap
+        = new EnumMap<StorageType, StorageGroup>(StorageType.class);
     protected long delayUntil = 0L;
     //  blocks being moved but not confirmed yet
     private final List<PendingBlockMove> pendingBlocks;
@@ -537,78 +615,38 @@ public class Balancer {
     
     @Override
     public String toString() {
-      return getClass().getSimpleName() + "[" + datanode
-          + ", utilization=" + utilization + "]";
+      return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
     }
 
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
-        int maxConcurrentMoves) {
-      datanode = node;
-      utilization = policy.getUtilization(node);
-      final double avgUtil = policy.getAvgUtilization();
-      long maxSizeToMove;
-
-      if (utilization >= avgUtil+threshold
-          || utilization <= avgUtil-threshold) { 
-        maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
-      } else {
-        maxSizeToMove = 
-          (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
-      }
-      if (utilization < avgUtil ) {
-        maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
-      }
-      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+    private BalancerDatanode(DatanodeStorageReport report,
+        double threshold, int maxConcurrentMoves) {
+      this.datanode = report.getDatanodeInfo();
       this.maxConcurrentMoves = maxConcurrentMoves;
       this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
     }
     
-    /** Get the datanode */
-    protected DatanodeInfo getDatanode() {
-      return datanode;
-    }
-    
-    /** Get the name of the datanode */
-    protected String getDisplayName() {
-      return datanode.toString();
-    }
-    
-    /* Get the storage id of the datanode */
-    protected String getStorageID() {
-      return datanode.getDatanodeUuid();
-    }
-    
-    /** Decide if still need to move more bytes */
-    protected synchronized boolean hasSpaceForScheduling() {
-      return scheduledSize<maxSize2Move;
-    }
-
-    /** Return the total number of bytes that need to be moved */
-    protected synchronized long availableSizeToMove() {
-      return maxSize2Move-scheduledSize;
-    }
-    
-    /** increment scheduled size */
-    protected synchronized void incScheduledSize(long size) {
-      scheduledSize += size;
-    }
-    
-    /** decrement scheduled size */
-    protected synchronized void decScheduledSize(long size) {
-      scheduledSize -= size;
-    }
-    
-    /** get scheduled size */
-    protected synchronized long getScheduledSize(){
-      return scheduledSize;
-    }
-    
-    /** get scheduled size */
-    protected synchronized void setScheduledSize(long size){
-      scheduledSize = size;
+    private void put(StorageType storageType, StorageGroup g) {
+      final StorageGroup existing = storageMap.put(storageType, g);
+      Preconditions.checkState(existing == null);
+    }
+
+    StorageGroup addStorageGroup(StorageType storageType, double utilization,
+        long maxSize2Move) {
+      final StorageGroup g = new StorageGroup(storageType, utilization,
+          maxSize2Move);
+      put(storageType, g);
+      return g;
+    }
+
+    Source addSource(StorageType storageType, double utilization,
+        long maxSize2Move, Balancer balancer) {
+      final Source s = balancer.new Source(storageType, utilization,
+          maxSize2Move, this);
+      put(storageType, s);
+      return s;
     }
 
     synchronized private void activateDelay(long delta) {
@@ -651,9 +689,9 @@ public class Balancer {
       return pendingBlocks.remove(pendingBlock);
     }
   }
-  
+
   /** A node that can be the sources of a block move */
-  private class Source extends BalancerDatanode {
+  private class Source extends BalancerDatanode.StorageGroup {
     
     /* A thread that initiates a block move 
      * and waits for block move to complete */
@@ -664,7 +702,7 @@ public class Balancer {
       }
     }
     
-    private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
+    private final List<Task> tasks = new ArrayList<Task>(2);
     private long blocksToReceive = 0L;
     /* source blocks point to balancerBlocks in the global list because
      * we want to keep one copy of a block in balancer and be aware that
@@ -674,17 +712,17 @@ public class Balancer {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
-        int maxConcurrentMoves) {
-      super(node, policy, threshold, maxConcurrentMoves);
+    private Source(StorageType storageType, double utilization,
+        long maxSize2Move, BalancerDatanode dn) {
+      dn.super(storageType, utilization, maxSize2Move);
     }
     
-    /** Add a node task */
-    private void addNodeTask(NodeTask task) {
-      assert (task.datanode != this) :
-        "Source and target are the same " + datanode;
-      incScheduledSize(task.getSize());
-      nodeTasks.add(task);
+    /** Add a task */
+    private void addTask(Task task) {
+      Preconditions.checkState(task.target != this,
+          "Source and target are the same storage group " + getDisplayName());
+      incScheduledSize(task.size);
+      tasks.add(task);
     }
     
     /* Return an iterator to this source's blocks */
@@ -697,8 +735,10 @@ public class Balancer {
      * Return the total size of the received blocks in the number of bytes.
      */
     private long getBlockList() throws IOException {
-      BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, 
-        Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+      final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+      final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
+          getDatanode(), size).getBlocks();
+
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks) {
         bytesReceived += blk.getBlock().getNumBytes();
@@ -714,10 +754,13 @@ public class Balancer {
         
           synchronized (block) {
             // update locations
-            for (String datanodeUuid : blk.getDatanodeUuids()) {
-              final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (d != null) { // not an unknown datanode
-                block.addLocation(d);
+            final String[] datanodeUuids = blk.getDatanodeUuids();
+            final StorageType[] storageTypes = blk.getStorageTypes();
+            for (int i = 0; i < datanodeUuids.length; i++) {
+              final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+                  datanodeUuids[i], storageTypes[i]);
+              if (g != null) { // not unknown
+                block.addLocation(g);
               }
             }
           }
@@ -732,8 +775,8 @@ public class Balancer {
 
     /* Decide if the given block is a good candidate to move or not */
     private boolean isGoodBlockCandidate(BalancerBlock block) {
-      for (NodeTask nodeTask : nodeTasks) {
-        if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+      for (Task t : tasks) {
+        if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) {
           return true;
         }
       }
@@ -748,20 +791,20 @@ public class Balancer {
      * The block should be dispatched immediately after this method is returned.
      */
     private PendingBlockMove chooseNextBlockToMove() {
-      for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
-        NodeTask task = tasks.next();
-        BalancerDatanode target = task.getDatanode();
+      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
+        final Task task = i.next();
+        final BalancerDatanode target = task.target.getBalancerDatanode();
         PendingBlockMove pendingBlock = new PendingBlockMove();
         if (target.addPendingBlock(pendingBlock)) { 
           // target is not busy, so do a tentative block allocation
           pendingBlock.source = this;
-          pendingBlock.target = target;
+          pendingBlock.target = task.target;
           if ( pendingBlock.chooseBlockAndProxy() ) {
             long blockSize = pendingBlock.block.getNumBytes();
-            decScheduledSize(blockSize);
+            incScheduledSize(-blockSize);
             task.size -= blockSize;
             if (task.size == 0) {
-              tasks.remove();
+              i.remove();
             }
             return pendingBlock;
           } else {
@@ -835,7 +878,7 @@ public class Balancer {
           // in case no blocks can be moved for source node's task,
           // jump out of while-loop after 5 iterations.
           if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
-            setScheduledSize(0);
+            resetScheduledSize();
           }
         }
         
@@ -902,108 +945,154 @@ public class Balancer {
         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
-  /* Given a data node set, build a network topology and decide
-   * over-utilized datanodes, above average utilized datanodes, 
-   * below average utilized datanodes, and underutilized datanodes. 
-   * The input data node set is shuffled before the datanodes 
-   * are put into the over-utilized datanodes, above average utilized
-   * datanodes, below average utilized datanodes, and
-   * underutilized datanodes lists. This will add some randomness
-   * to the node matching later on.
-   * 
+  
+  private static long getCapacity(DatanodeStorageReport report, StorageType t) {
+    long capacity = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        capacity += r.getCapacity();
+      }
+    }
+    return capacity;
+  }
+
+  private static long getRemaining(DatanodeStorageReport report, StorageType t) {
+    long remaining = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        remaining += r.getRemaining();
+      }
+    }
+    return remaining;
+  }
+
+  private boolean shouldIgnore(DatanodeInfo dn) {
+    //ignore decommissioned nodes
+    final boolean decommissioned = dn.isDecommissioned();
+    //ignore decommissioning nodes
+    final boolean decommissioning = dn.isDecommissionInProgress();
+    // ignore nodes in exclude list
+    final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn);
+    // ignore nodes not in the include list (if include list is not empty)
+    final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn);
+    
+    if (decommissioned || decommissioning || excluded || notIncluded) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+            + decommissioning + ", " + excluded + ", " + notIncluded);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Given a datanode storage set, build a network topology and decide
+   * over-utilized storages, above average utilized storages, 
+   * below average utilized storages, and underutilized storages. 
+   * The input datanode storage set is shuffled in order to randomize
+   * to the storage matching later on.
+   *
    * @return the total number of bytes that are 
    *                needed to move to make the cluster balanced.
-   * @param datanodes a set of datanodes
+   * @param reports a set of datanode storage reports
    */
-  private long initNodes(DatanodeInfo[] datanodes) {
+  private long init(DatanodeStorageReport[] reports) {
     // compute average utilization
-    for (DatanodeInfo datanode : datanodes) {
-     // ignore decommissioning or decommissioned nodes or
-      // ignore nodes in exclude list
-      // or nodes not in the include list (if include list is not empty)
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
-          Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
-          !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
-        continue;
+    for (DatanodeStorageReport r : reports) {
+      if (shouldIgnore(r.getDatanodeInfo())) {
+        continue; 
       }
-      policy.accumulateSpaces(datanode);
+      policy.accumulateSpaces(r);
     }
     policy.initAvgUtilization();
 
-    /*create network topology and all data node lists: 
-     * overloaded, above-average, below-average, and underloaded
-     * we alternates the accessing of the given datanodes array either by
-     * an increasing order or a decreasing order.
-     */  
+    // create network topology and classify utilization collections: 
+    //   over-utilized, above-average, below-average and under-utilized.
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
-      // ignore decommissioning or decommissioned nodes or
-      // ignore nodes in exclude list
-      // or nodes not in the include list (if include list is not empty)
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
-          Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
-          !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Excluding datanode " + datanode);
-        }
-        continue;
+    for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
+      final DatanodeInfo datanode = r.getDatanodeInfo();
+      if (shouldIgnore(datanode)) {
+        continue; // ignore decommissioning or decommissioned nodes
       }
       cluster.add(datanode);
-      BalancerDatanode datanodeS;
-      final double avg = policy.getAvgUtilization();
-      if (policy.getUtilization(datanode) > avg) {
-        datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
-        if (isAboveAvgUtilized(datanodeS)) {
-          this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
-        } else {
-          assert(isOverUtilized(datanodeS)) :
-            datanodeS.getDisplayName()+ "is not an overUtilized node";
-          this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avg
-              -threshold)*datanodeS.datanode.getCapacity()/100.0);
+
+      final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes,
+          maxConcurrentMovesPerNode);
+      for(StorageType t : StorageType.asList()) {
+        final Double utilization = policy.getUtilization(r, t);
+        if (utilization == null) { // datanode does not have such storage type 
+          continue;
         }
-      } else {
-        datanodeS = new BalancerDatanode(datanode, policy, threshold,
-            maxConcurrentMovesPerNode);
-        if ( isBelowOrEqualAvgUtilized(datanodeS)) {
-          this.belowAvgUtilizedDatanodes.add(datanodeS);
+        
+        final long capacity = getCapacity(r, t);
+        final double utilizationDiff = utilization - policy.getAvgUtilization(t);
+        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
+        final long maxSize2Move = computeMaxSize2Move(capacity,
+            getRemaining(r, t), utilizationDiff, threshold);
+
+        final BalancerDatanode.StorageGroup g;
+        if (utilizationDiff > 0) {
+          final Source s = dn.addSource(t, utilization, maxSize2Move, this);
+          if (thresholdDiff <= 0) { // within threshold
+            aboveAvgUtilized.add(s);
+          } else {
+            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overUtilized.add(s);
+          }
+          g = s;
         } else {
-          assert isUnderUtilized(datanodeS) : "isUnderUtilized("
-              + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
-              + ", utilization=" + datanodeS.utilization; 
-          this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avg-threshold-
-              datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+          g = dn.addStorageGroup(t, utilization, maxSize2Move);
+          if (thresholdDiff <= 0) { // within threshold
+            belowAvgUtilized.add(g);
+          } else {
+            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underUtilized.add(g);
+          }
         }
+        storageGroupMap.put(g);
       }
-      datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
     }
 
-    //logging
-    logNodes();
+    logUtilizationCollections();
     
-    assert (this.datanodeMap.size() == 
-      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
-      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
-      : "Mismatched number of datanodes";
+    Preconditions.checkState(storageGroupMap.size() == overUtilized.size()
+        + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(),
+        "Mismatched number of storage groups");
     
     // return number of bytes to be moved in order to make the cluster balanced
     return Math.max(overLoadedBytes, underLoadedBytes);
   }
 
+  private static long computeMaxSize2Move(final long capacity, final long remaining,
+      final double utilizationDiff, final double threshold) {
+    final double diff = Math.min(threshold, Math.abs(utilizationDiff));
+    long maxSizeToMove = precentage2bytes(diff, capacity);
+    if (utilizationDiff < 0) {
+      maxSizeToMove = Math.min(remaining, maxSizeToMove);
+    }
+    return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+  }
+
+  private static long precentage2bytes(double precentage, long capacity) {
+    Preconditions.checkArgument(precentage >= 0,
+        "precentage = " + precentage + " < 0");
+    return (long)(precentage * capacity / 100.0);
+  }
+
   /* log the over utilized & under utilized nodes */
-  private void logNodes() {
-    logNodes("over-utilized", overUtilizedDatanodes);
+  private void logUtilizationCollections() {
+    logUtilizationCollection("over-utilized", overUtilized);
     if (LOG.isTraceEnabled()) {
-      logNodes("above-average", aboveAvgUtilizedDatanodes);
-      logNodes("below-average", belowAvgUtilizedDatanodes);
+      logUtilizationCollection("above-average", aboveAvgUtilized);
+      logUtilizationCollection("below-average", belowAvgUtilized);
     }
-    logNodes("underutilized", underUtilizedDatanodes);
+    logUtilizationCollection("underutilized", underUtilized);
   }
 
-  private static <T extends BalancerDatanode> void logNodes(
-      String name, Collection<T> nodes) {
-    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  private static <T extends BalancerDatanode.StorageGroup>
+      void logUtilizationCollection(String name, Collection<T> items) {
+    LOG.info(items.size() + " " + name + ": " + items);
   }
 
   /** A matcher interface for matching nodes. */
@@ -1039,26 +1128,24 @@ public class Balancer {
   /**
    * Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
-   * Maximum bytes to be moved per node is
-   * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
-   * Return total number of bytes to move in this iteration
+   * Maximum bytes to be moved per storage group is
+   * min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
+   * @return total number of bytes to move in this iteration
    */
-  private long chooseNodes() {
+  private long chooseStorageGroups() {
     // First, match nodes on the same node group if cluster is node group aware
     if (cluster.isNodeGroupAware()) {
-      chooseNodes(SAME_NODE_GROUP);
+      chooseStorageGroups(SAME_NODE_GROUP);
     }
     
     // Then, match nodes on the same rack
-    chooseNodes(SAME_RACK);
+    chooseStorageGroups(SAME_RACK);
     // At last, match all remaining nodes
-    chooseNodes(ANY_OTHER);
+    chooseStorageGroups(ANY_OTHER);
     
-    assert (datanodeMap.size() >= sources.size()+targets.size())
-      : "Mismatched number of datanodes (" +
-      datanodeMap.size() + " total, " +
-      sources.size() + " sources, " +
-      targets.size() + " targets)";
+    Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
+        "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
+            + sources.size() + " sources, " + targets.size() + " targets)");
 
     long bytesToMove = 0L;
     for (Source src : sources) {
@@ -1068,25 +1155,25 @@ public class Balancer {
   }
 
   /** Decide all <source, target> pairs according to the matcher. */
-  private void chooseNodes(final Matcher matcher) {
+  private void chooseStorageGroups(final Matcher matcher) {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, underUtilized, matcher);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
 
     /* match each remaining underutilized datanode (target) to 
      * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
   }
 
   /**
@@ -1094,13 +1181,14 @@ public class Balancer {
    * datanodes or the candidates are source nodes with (utilization > Avg), and
    * the others are target nodes with (utilization < Avg).
    */
-  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
-      chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+  private <G extends BalancerDatanode.StorageGroup,
+           C extends BalancerDatanode.StorageGroup>
+      void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
           Matcher matcher) {
-    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
-      final D datanode = i.next();
-      for(; chooseForOneDatanode(datanode, candidates, matcher); );
-      if (!datanode.hasSpaceForScheduling()) {
+    for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
+      final G g = i.next();
+      for(; choose4One(g, candidates, matcher); );
+      if (!g.hasSpaceForScheduling()) {
         i.remove();
       }
     }
@@ -1110,18 +1198,19 @@ public class Balancer {
    * For the given datanode, choose a candidate and then schedule it.
    * @return true if a candidate is chosen; false if no candidates is chosen.
    */
-  private <C extends BalancerDatanode> boolean chooseForOneDatanode(
-      BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+  private <C extends BalancerDatanode.StorageGroup>
+      boolean choose4One(BalancerDatanode.StorageGroup g,
+          Collection<C> candidates, Matcher matcher) {
     final Iterator<C> i = candidates.iterator();
-    final C chosen = chooseCandidate(dn, i, matcher);
-
+    final C chosen = chooseCandidate(g, i, matcher);
+  
     if (chosen == null) {
       return false;
     }
-    if (dn instanceof Source) {
-      matchSourceWithTargetToMove((Source)dn, chosen);
+    if (g instanceof Source) {
+      matchSourceWithTargetToMove((Source)g, chosen);
     } else {
-      matchSourceWithTargetToMove((Source)chosen, dn);
+      matchSourceWithTargetToMove((Source)chosen, g);
     }
     if (!chosen.hasSpaceForScheduling()) {
       i.remove();
@@ -1129,27 +1218,28 @@ public class Balancer {
     return true;
   }
   
-  private void matchSourceWithTargetToMove(
-      Source source, BalancerDatanode target) {
+  private void matchSourceWithTargetToMove(Source source,
+      BalancerDatanode.StorageGroup target) {
     long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
-    NodeTask nodeTask = new NodeTask(target, size);
-    source.addNodeTask(nodeTask);
-    target.incScheduledSize(nodeTask.getSize());
+    final Task task = new Task(target, size);
+    source.addTask(task);
+    target.incScheduledSize(task.size);
     sources.add(source);
     targets.add(target);
     LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-        +source.datanode.getName() + " to " + target.datanode.getName());
+        + source.getDisplayName() + " to " + target.getDisplayName());
   }
   
   /** Choose a candidate for the given datanode. */
-  private <D extends BalancerDatanode, C extends BalancerDatanode>
-      C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
-    if (dn.hasSpaceForScheduling()) {
+  private <G extends BalancerDatanode.StorageGroup,
+           C extends BalancerDatanode.StorageGroup>
+      C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
+    if (g.hasSpaceForScheduling()) {
       for(; candidates.hasNext(); ) {
         final C c = candidates.next();
         if (!c.hasSpaceForScheduling()) {
           candidates.remove();
-        } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+        } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) {
           return c;
         }
       }
@@ -1203,9 +1293,10 @@ public class Balancer {
     boolean shouldWait;
     do {
       shouldWait = false;
-      for (BalancerDatanode target : targets) {
-        if (!target.isPendingQEmpty()) {
+      for (BalancerDatanode.StorageGroup target : targets) {
+        if (!target.getBalancerDatanode().isPendingQEmpty()) {
           shouldWait = true;
+          break;
         }
       }
       if (shouldWait) {
@@ -1274,12 +1365,15 @@ public class Balancer {
    * 3. doing the move does not reduce the number of racks that the block has
    */
   private boolean isGoodBlockCandidate(Source source, 
-      BalancerDatanode target, BalancerBlock block) {
+      BalancerDatanode.StorageGroup target, BalancerBlock block) {
+    if (source.storageType != target.storageType) {
+      return false;
+    }
     // check if the block is moved or not
     if (movedBlocks.contains(block)) {
-        return false;
+      return false;
     }
-    if (block.isLocatedOnDatanode(target)) {
+    if (block.isLocatedOn(target)) {
       return false;
     }
     if (cluster.isNodeGroupAware() && 
@@ -1294,8 +1388,8 @@ public class Balancer {
     } else {
       boolean notOnSameRack = true;
       synchronized (block) {
-        for (BalancerDatanode loc : block.locations) {
-          if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
+        for (BalancerDatanode.StorageGroup loc : block.locations) {
+          if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
             notOnSameRack = false;
             break;
           }
@@ -1306,9 +1400,9 @@ public class Balancer {
         goodBlock = true;
       } else {
         // good if source is on the same rack as on of the replicas
-        for (BalancerDatanode loc : block.locations) {
+        for (BalancerDatanode.StorageGroup loc : block.locations) {
           if (loc != source && 
-              cluster.isOnSameRack(loc.datanode, source.datanode)) {
+              cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
             goodBlock = true;
             break;
           }
@@ -1329,25 +1423,26 @@ public class Balancer {
    * @return true if there are any replica (other than source) on the same node
    * group with target
    */
-  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
       BalancerBlock block, Source source) {
-    for (BalancerDatanode loc : block.locations) {
+    final DatanodeInfo targetDn = target.getDatanode();
+    for (BalancerDatanode.StorageGroup loc : block.locations) {
       if (loc != source && 
-        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
-          return true;
-        }
+          cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
+        return true;
       }
+    }
     return false;
   }
 
   /* reset all fields in a balancer preparing for the next iteration */
   private void resetData(Configuration conf) {
     this.cluster = NetworkTopology.getInstance(conf);
-    this.overUtilizedDatanodes.clear();
-    this.aboveAvgUtilizedDatanodes.clear();
-    this.belowAvgUtilizedDatanodes.clear();
-    this.underUtilizedDatanodes.clear();
-    this.datanodeMap.clear();
+    this.overUtilized.clear();
+    this.aboveAvgUtilized.clear();
+    this.belowAvgUtilized.clear();
+    this.underUtilized.clear();
+    this.storageGroupMap.clear();
     this.sources.clear();
     this.targets.clear();  
     this.policy.reset();
@@ -1367,32 +1462,6 @@ public class Balancer {
       }
     }
   }
-  
-  /* Return true if the given datanode is overUtilized */
-  private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (policy.getAvgUtilization()+threshold);
-  }
-  
-  /* Return true if the given datanode is above average utilized
-   * but not overUtilized */
-  private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization <= (avg+threshold))
-        && (datanode.utilization > avg);
-  }
-  
-  /* Return true if the given datanode is underUtilized */
-  private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (policy.getAvgUtilization()-threshold);
-  }
-
-  /* Return true if the given datanode is below average utilized 
-   * but not underUtilized */
-  private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization >= (avg-threshold))
-             && (datanode.utilization <= avg);
-  }
 
   // Exit status
   enum ReturnStatus {
@@ -1420,7 +1489,8 @@ public class Balancer {
       /* get all live datanodes of a cluster and their disk usage
        * decide the number of bytes need to be moved
        */
-      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+      final long bytesLeftToMove = init(
+          nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
       if (bytesLeftToMove == 0) {
         System.out.println("The cluster is balanced. Exiting...");
         return ReturnStatus.SUCCESS;
@@ -1434,7 +1504,7 @@ public class Balancer {
        * in this iteration. Maximum bytes to be moved per node is
        * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
        */
-      final long bytesToMove = chooseNodes();
+      final long bytesToMove = chooseStorageGroups();
       if (bytesToMove == 0) {
         System.out.println("No block can be moved. Exiting...");
         return ReturnStatus.NO_MOVE_BLOCK;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Fri Aug  1 01:12:24 2014
@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
 
 /**
  * Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D
  */
 @InterfaceAudience.Private
 abstract class BalancingPolicy {
-  long totalCapacity;
-  long totalUsedSpace;
-  private double avgUtilization;
+  final EnumCounters<StorageType> totalCapacities
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumCounters<StorageType> totalUsedSpaces
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumDoubles<StorageType> avgUtilizations
+      = new EnumDoubles<StorageType>(StorageType.class);
 
   void reset() {
-    totalCapacity = 0L;
-    totalUsedSpace = 0L;
-    avgUtilization = 0.0;
+    totalCapacities.reset();
+    totalUsedSpaces.reset();
+    avgUtilizations.reset();
   }
 
   /** Get the policy name. */
   abstract String getName();
 
   /** Accumulate used space and capacity. */
-  abstract void accumulateSpaces(DatanodeInfo d);
+  abstract void accumulateSpaces(DatanodeStorageReport r);
 
   void initAvgUtilization() {
-    this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+    for(StorageType t : StorageType.asList()) {
+      final long capacity = totalCapacities.get(t);
+      if (capacity > 0L) {
+        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
+        avgUtilizations.set(t, avg);
+      }
+    }
   }
-  double getAvgUtilization() {
-    return avgUtilization;
+
+  double getAvgUtilization(StorageType t) {
+    return avgUtilizations.get(t);
   }
 
-  /** Return the utilization of a datanode */
-  abstract double getUtilization(DatanodeInfo d);
+  /** @return the utilization of a particular storage type of a datanode;
+   *          or return null if the datanode does not have such storage type.
+   */
+  abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
   
   @Override
   public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getDfsUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getDfsUsed());
+      }
     }
     
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getDfsUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long dfsUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          dfsUsed += s.getDfsUsed();
+        }
+      }
+      return capacity == 0L? null: dfsUsed*100.0/capacity;
     }
   }
 
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getBlockPoolUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getBlockPoolUsed());
+      }
     }
 
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getBlockPoolUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long blockPoolUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          blockPoolUsed += s.getBlockPoolUsed();
+        }
+      }
+      return capacity == 0L? null: blockPoolUsed*100.0/capacity;
     }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Aug  1 01:12:24 2014
@@ -2829,12 +2829,15 @@ public class BlockManager {
     } else {
       final String[] datanodeUuids = new String[locations.size()];
       final String[] storageIDs = new String[datanodeUuids.length];
+      final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
       for(int i = 0; i < locations.size(); i++) {
         final DatanodeStorageInfo s = locations.get(i);
         datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
         storageIDs[i] = s.getStorageID();
+        storageTypes[i] = s.getStorageType();
       }
-      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
+      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
+          storageTypes));
       return block.getNumBytes();
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java Fri Aug  1 01:12:24 2014
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
@@ -39,12 +38,15 @@ public class BlocksWithLocations {
     final Block block;
     final String[] datanodeUuids;
     final String[] storageIDs;
+    final StorageType[] storageTypes;
     
     /** constructor */
-    public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) {
+    public BlockWithLocations(Block block, String[] datanodeUuids,
+        String[] storageIDs, StorageType[] storageTypes) {
       this.block = block;
       this.datanodeUuids = datanodeUuids;
       this.storageIDs = storageIDs;
+      this.storageTypes = storageTypes;
     }
     
     /** get the block */
@@ -61,7 +63,12 @@ public class BlocksWithLocations {
     public String[] getStorageIDs() {
       return storageIDs;
     }
-    
+
+    /** @return the storage types */
+    public StorageType[] getStorageTypes() {
+      return storageTypes;
+    }
+
     @Override
     public String toString() {
       final StringBuilder b = new StringBuilder();
@@ -70,12 +77,18 @@ public class BlocksWithLocations {
         return b.append("[]").toString();
       }
       
-      b.append(storageIDs[0]).append('@').append(datanodeUuids[0]);
+      appendString(0, b.append("["));
       for(int i = 1; i < datanodeUuids.length; i++) {
-        b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]);
+        appendString(i, b.append(","));
       }
       return b.append("]").toString();
     }
+    
+    private StringBuilder appendString(int i, StringBuilder b) {
+      return b.append("[").append(storageTypes[i]).append("]")
+              .append(storageIDs[i])
+              .append("@").append(datanodeUuids[i]);
+    }
   }
 
   private final BlockWithLocations[] blocks;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java Fri Aug  1 01:12:24 2014
@@ -37,7 +37,7 @@ import com.google.common.base.Preconditi
 public class EnumCounters<E extends Enum<E>> {
   /** The class of the enum. */
   private final Class<E> enumClass;
-  /** The counter array, counters[i] corresponds to the enumConstants[i]. */
+  /** An array of longs corresponding to the enum type. */
   private final long[] counters;
 
   /**
@@ -75,6 +75,13 @@ public class EnumCounters<E extends Enum
     }
   }
 
+  /** Reset all counters to zero. */
+  public final void reset() {
+    for(int i = 0; i < counters.length; i++) {
+      this.counters[i] = 0L;
+    }
+  }
+
   /** Add the given value to counter e. */
   public final void add(final E e, final long value) {
     counters[e.ordinal()] += value;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Fri Aug  1 01:12:24 2014
@@ -404,6 +404,7 @@ message BlockWithLocationsProto {
   required BlockProto block = 1;   // Block
   repeated string datanodeUuids = 2; // Datanodes with replicas of the block
   repeated string storageUuids = 3;  // Storages with replicas of the block
+  repeated StorageTypeProto storageTypes = 4;
 }
 
 /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1615016&r1=1615015&r2=1615016&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Fri Aug  1 01:12:24 2014
@@ -184,8 +184,10 @@ public class TestPBHelper {
   private static BlockWithLocations getBlockWithLocations(int bid) {
     final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
     final String[] storageIDs = {"s1", "s2", "s3"};
+    final StorageType[] storageTypes = {
+        StorageType.DISK, StorageType.DISK, StorageType.DISK};
     return new BlockWithLocations(new Block(bid, 0, 1),
-        datanodeUuids, storageIDs);
+        datanodeUuids, storageIDs, storageTypes);
   }
 
   private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {