You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2014/08/05 04:31:00 UTC

svn commit: r1615844 [2/5] - in /hadoop/common/branches/fs-encryption/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/src/main/j...

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Tue Aug  5 02:30:54 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
@@ -456,6 +457,11 @@ public class Hdfs extends AbstractFileSy
     dfs.removeXAttr(getUriPath(path), name);
   }
 
+  @Override
+  public void access(Path path, final FsAction mode) throws IOException {
+    dfs.checkAccess(getUriPath(path), mode);
+  }
+
   /**
    * Renew an existing delegation token.
    * 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug  5 02:30:54 2014
@@ -132,6 +132,7 @@ import org.apache.hadoop.fs.XAttrSetFlag
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -2951,6 +2952,17 @@ public class DFSClient implements java.i
     }
   }
 
+  public void checkAccess(String src, FsAction mode) throws IOException {
+    checkOpen();
+    try {
+      namenode.checkAccess(src, mode);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          FileNotFoundException.class,
+          UnresolvedPathException.class);
+    }
+  }
+
   @Override // RemotePeerFactory
   public Peer newConnectedPeer(InetSocketAddress addr,
       Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Aug  5 02:30:54 2014
@@ -381,8 +381,6 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;
   public static final String  DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads";
   public static final int     DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
-  public static final String  DFS_DATANODE_NUMBLOCKS_KEY = "dfs.datanode.numblocks";
-  public static final int     DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
   public static final String  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
@@ -668,4 +666,8 @@ public class DFSConfigKeys extends Commo
    public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
      "dfs.datanode.slow.io.warning.threshold.ms";
    public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
+
+   public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
+       "dfs.datanode.block.id.layout.upgrade.threads";
+   public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Aug  5 02:30:54 2014
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -1913,4 +1914,23 @@ public class DistributedFileSystem exten
       }
     }.resolve(this, absF);
   }
+
+  @Override
+  public void access(Path path, final FsAction mode) throws IOException {
+    final Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.checkAccess(getPathName(p), mode);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        fs.access(p, mode);
+        return null;
+      }
+    }.resolve(this, absF);
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Tue Aug  5 02:30:54 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -32,4 +35,11 @@ public enum StorageType {
   SSD;
 
   public static final StorageType DEFAULT = DISK;
+  public static final StorageType[] EMPTY_ARRAY = {};
+  
+  private static final StorageType[] VALUES = values();
+  
+  public static List<StorageType> asList() {
+    return Arrays.asList(VALUES);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Tue Aug  5 02:30:54 2014
@@ -50,6 +50,9 @@ public class Block implements Writable, 
   public static final Pattern metaFilePattern = Pattern
       .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
           + "$");
+  public static final Pattern metaOrBlockFilePattern = Pattern
+      .compile(BLOCK_FILE_PREFIX + "(-??\\d++)(_(\\d++)\\" + METADATA_EXTENSION
+          + ")?$");
 
   public static boolean isBlockFilename(File f) {
     String name = f.getName();
@@ -65,6 +68,11 @@ public class Block implements Writable, 
     return metaFilePattern.matcher(name).matches();
   }
 
+  public static File metaToBlockFile(File metaFile) {
+    return new File(metaFile.getParent(), metaFile.getName().substring(
+        0, metaFile.getName().lastIndexOf('_')));
+  }
+
   /**
    * Get generation stamp from the name of the metafile name
    */
@@ -75,10 +83,10 @@ public class Block implements Writable, 
   }
 
   /**
-   * Get the blockId from the name of the metafile name
+   * Get the blockId from the name of the meta or block file
    */
-  public static long getBlockId(String metaFile) {
-    Matcher m = metaFilePattern.matcher(metaFile);
+  public static long getBlockId(String metaOrBlockFile) {
+    Matcher m = metaOrBlockFilePattern.matcher(metaOrBlockFile);
     return m.matches() ? Long.parseLong(m.group(1)) : 0;
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Aug  5 02:30:54 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -1346,4 +1347,22 @@ public interface ClientProtocol {
    */
   @AtMostOnce
   public void removeXAttr(String src, XAttr xAttr) throws IOException;
+
+  /**
+   * Checks if the user can access a path.  The mode specifies which access
+   * checks to perform.  If the requested permissions are granted, then the
+   * method returns normally.  If access is denied, then the method throws an
+   * {@link AccessControlException}.
+   * In general, applications should avoid using this method, due to the risk of
+   * time-of-check/time-of-use race conditions.  The permissions on a file may
+   * change immediately after the access call returns.
+   *
+   * @param path Path to check
+   * @param mode type of access to check
+   * @throws AccessControlException if access is denied
+   * @throws FileNotFoundException if the path does not exist
+   * @throws IOException see specific implementation
+   */
+  @Idempotent
+  public void checkAccess(String path, FsAction mode) throws IOException;
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Aug  5 02:30:54 2014
@@ -175,6 +175,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
@@ -325,6 +327,9 @@ public class ClientNamenodeProtocolServe
   private static final RemoveXAttrResponseProto
     VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance();
 
+  private static final CheckAccessResponseProto
+    VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance();
+
   /**
    * Constructor
    * 
@@ -1375,4 +1380,15 @@ public class ClientNamenodeProtocolServe
     }
     return VOID_REMOVEXATTR_RESPONSE;
   }
+
+  @Override
+  public CheckAccessResponseProto checkAccess(RpcController controller,
+     CheckAccessRequestProto req) throws ServiceException {
+    try {
+      server.checkAccess(req.getPath(), PBHelper.convert(req.getMode()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return VOID_CHECKACCESS_RESPONSE;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Aug  5 02:30:54 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -147,6 +148,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
@@ -1400,4 +1402,15 @@ public class ClientNamenodeProtocolTrans
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public void checkAccess(String path, FsAction mode) throws IOException {
+    CheckAccessRequestProto req = CheckAccessRequestProto.newBuilder()
+        .setPath(path).setMode(PBHelper.convert(mode)).build();
+    try {
+      rpcProxy.checkAccess(null, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Aug  5 02:30:54 2014
@@ -357,15 +357,19 @@ public class PBHelper {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
-        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
+        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
+        .build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
     final List<String> datanodeUuids = b.getDatanodeUuidsList();
     final List<String> storageUuids = b.getStorageUuidsList();
+    final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
     return new BlockWithLocations(convert(b.getBlock()),
         datanodeUuids.toArray(new String[datanodeUuids.size()]),
-        storageUuids.toArray(new String[storageUuids.size()]));
+        storageUuids.toArray(new String[storageUuids.size()]),
+        convertStorageTypes(storageTypes, storageUuids.size()));
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@ -2122,11 +2126,11 @@ public class PBHelper {
     return castEnum(v, XATTR_NAMESPACE_VALUES);
   }
 
-  private static FsActionProto convert(FsAction v) {
+  public static FsActionProto convert(FsAction v) {
     return FsActionProto.valueOf(v != null ? v.ordinal() : 0);
   }
 
-  private static FsAction convert(FsActionProto v) {
+  public static FsAction convert(FsActionProto v) {
     return castEnum(v, FSACTION_VALUES);
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Aug  5 02:30:54 2014
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -78,16 +80,21 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.base.Preconditions;
+
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
  * The tool is deployed as an application program that can be run by the 
@@ -188,7 +195,9 @@ import org.apache.hadoop.util.ToolRunner
 @InterfaceAudience.Private
 public class Balancer {
   static final Log LOG = LogFactory.getLog(Balancer.class);
-  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+  final private static long GB = 1L << 30; //1GB
+  final private static long MAX_SIZE_TO_MOVE = 10*GB;
+  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
   private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
 
   /** The maximum number of concurrent blocks moves for 
@@ -203,34 +212,38 @@ public class Balancer {
       + "\n\t[-policy <policy>]\tthe balancing policy: "
       + BalancingPolicy.Node.INSTANCE.getName() + " or "
       + BalancingPolicy.Pool.INSTANCE.getName()
-      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+      + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tExcludes the specified datanodes."
+      + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tIncludes only the specified datanodes.";
   
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
   private final SaslDataTransferClient saslClient;
   private final double threshold;
+  // set of data nodes to be excluded from balancing operations.
+  Set<String> nodesToBeExcluded;
+  //Restrict balancing to the following nodes.
+  Set<String> nodesToBeIncluded;
   
   // all data node lists
-  private final Collection<Source> overUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<Source> aboveAvgUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  private final Collection<BalancerDatanode> underUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  
-  private final Collection<Source> sources
-                               = new HashSet<Source>();
-  private final Collection<BalancerDatanode> targets
-                               = new HashSet<BalancerDatanode>();
+  private final Collection<Source> overUtilized = new LinkedList<Source>();
+  private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized
+      = new LinkedList<BalancerDatanode.StorageGroup>();
+  private final Collection<BalancerDatanode.StorageGroup> underUtilized
+      = new LinkedList<BalancerDatanode.StorageGroup>();
+  
+  private final Collection<Source> sources = new HashSet<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> targets
+      = new HashSet<BalancerDatanode.StorageGroup>();
   
   private final Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
   private final MovedBlocks movedBlocks = new MovedBlocks();
-  /** Map (datanodeUuid -> BalancerDatanodes) */
-  private final Map<String, BalancerDatanode> datanodeMap
-      = new HashMap<String, BalancerDatanode>();
+  /** Map (datanodeUuid,storageType -> StorageGroup) */
+  private final StorageGroupMap storageGroupMap = new StorageGroupMap();
   
   private NetworkTopology cluster;
 
@@ -238,12 +251,39 @@ public class Balancer {
   private final ExecutorService dispatcherExecutor;
   private final int maxConcurrentMovesPerNode;
 
+
+  private static class StorageGroupMap {
+    private static String toKey(String datanodeUuid, StorageType storageType) {
+      return datanodeUuid + ":" + storageType;
+    }
+
+    private final Map<String, BalancerDatanode.StorageGroup> map
+        = new HashMap<String, BalancerDatanode.StorageGroup>();
+    
+    BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) {
+      return map.get(toKey(datanodeUuid, storageType));
+    }
+    
+    void put(BalancerDatanode.StorageGroup g) {
+      final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
+      final BalancerDatanode.StorageGroup existing = map.put(key, g);
+      Preconditions.checkState(existing == null);
+    }
+    
+    int size() {
+      return map.size();
+    }
+    
+    void clear() {
+      map.clear();
+    }
+  }
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
     private Source source;
     private BalancerDatanode proxySource;
-    private BalancerDatanode target;
+    private BalancerDatanode.StorageGroup target;
     
     /** constructor */
     private PendingBlockMove() {
@@ -254,7 +294,7 @@ public class Balancer {
       final Block b = block.getBlock();
       return b + " with size=" + b.getNumBytes() + " from "
           + source.getDisplayName() + " to " + target.getDisplayName()
-          + " through " + proxySource.getDisplayName();
+          + " through " + proxySource.datanode;
     }
 
     /* choose a block & a proxy source for this pendingMove 
@@ -306,20 +346,20 @@ public class Balancer {
       final DatanodeInfo targetDN = target.getDatanode();
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
-        for (BalancerDatanode loc : block.getLocations()) {
+        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
           if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
             return true;
           }
         }
       }
       // check if there is replica which is on the same rack with the target
-      for (BalancerDatanode loc : block.getLocations()) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
         if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
           return true;
         }
       }
       // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
         if (addTo(loc)) {
           return true;
         }
@@ -327,8 +367,9 @@ public class Balancer {
       return false;
     }
     
-    // add a BalancerDatanode as proxy source for specific block movement
-    private boolean addTo(BalancerDatanode bdn) {
+    /** add to a proxy source for specific block movement */
+    private boolean addTo(BalancerDatanode.StorageGroup g) {
+      final BalancerDatanode bdn = g.getBalancerDatanode();
       if (bdn.addPendingBlock(this)) {
         proxySource = bdn;
         return true;
@@ -344,7 +385,7 @@ public class Balancer {
       DataInputStream in = null;
       try {
         sock.connect(
-            NetUtils.createSocketAddr(target.datanode.getXferAddr()),
+            NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
         /* Unfortunately we don't have a good way to know if the Datanode is
          * taking a really long time to move a block, OR something has
@@ -361,7 +402,7 @@ public class Balancer {
         ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
         Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, nnc, accessToken, target.datanode);
+          unbufIn, nnc, accessToken, target.getDatanode());
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -381,14 +422,14 @@ public class Balancer {
          * gets out of sync with work going on in datanode.
          */
         proxySource.activateDelay(DELAY_AFTER_ERROR);
-        target.activateDelay(DELAY_AFTER_ERROR);
+        target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
         
         proxySource.removePendingBlock(this);
-        target.removePendingBlock(this);
+        target.getBalancerDatanode().removePendingBlock(this);
 
         synchronized (this ) {
           reset();
@@ -404,7 +445,7 @@ public class Balancer {
         StorageType storageType, 
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, storageType, accessToken,
-          source.getStorageID(), proxySource.getDatanode());
+          source.getDatanode().getDatanodeUuid(), proxySource.datanode);
     }
     
     /* Receive a block copy response from the input stream */ 
@@ -444,8 +485,9 @@ public class Balancer {
   /* A class for keeping track of blocks in the Balancer */
   static private class BalancerBlock {
     private final Block block; // the block
-    private final List<BalancerDatanode> locations
-            = new ArrayList<BalancerDatanode>(3); // its locations
+    /** The locations of the replicas of the block. */
+    private final List<BalancerDatanode.StorageGroup> locations
+        = new ArrayList<BalancerDatanode.StorageGroup>(3);
     
     /* Constructor */
     private BalancerBlock(Block block) {
@@ -458,20 +500,19 @@ public class Balancer {
     }
     
     /* add a location */
-    private synchronized void addLocation(BalancerDatanode datanode) {
-      if (!locations.contains(datanode)) {
-        locations.add(datanode);
+    private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
+      if (!locations.contains(g)) {
+        locations.add(g);
       }
     }
     
-    /* Return if the block is located on <code>datanode</code> */
-    private synchronized boolean isLocatedOnDatanode(
-        BalancerDatanode datanode) {
-      return locations.contains(datanode);
+    /** @return if the block is located on the given storage group. */
+    private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
+      return locations.contains(g);
     }
     
     /* Return its locations */
-    private synchronized List<BalancerDatanode> getLocations() {
+    private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
       return locations;
     }
     
@@ -488,37 +529,84 @@ public class Balancer {
   
   /* The class represents a desired move of bytes between two nodes 
    * and the target.
-   * An object of this class is stored in a source node. 
+   * An object of this class is stored in a source. 
    */
-  static private class NodeTask {
-    private final BalancerDatanode datanode; //target node
+  static private class Task {
+    private final BalancerDatanode.StorageGroup target;
     private long size;  //bytes scheduled to move
     
     /* constructor */
-    private NodeTask(BalancerDatanode datanode, long size) {
-      this.datanode = datanode;
+    private Task(BalancerDatanode.StorageGroup target, long size) {
+      this.target = target;
       this.size = size;
     }
-    
-    /* Get the node */
-    private BalancerDatanode getDatanode() {
-      return datanode;
-    }
-    
-    /* Get the number of bytes that need to be moved */
-    private long getSize() {
-      return size;
-    }
   }
   
   
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode {
-    final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+    
+    /** A group of storages in a datanode with the same storage type. */
+    private class StorageGroup {
+      final StorageType storageType;
+      final double utilization;
+      final long maxSize2Move;
+      private long scheduledSize = 0L;
+
+      private StorageGroup(StorageType storageType, double utilization,
+          long maxSize2Move) {
+        this.storageType = storageType;
+        this.utilization = utilization;
+        this.maxSize2Move = maxSize2Move;
+      }
+      
+      BalancerDatanode getBalancerDatanode() {
+        return BalancerDatanode.this;
+      }
+
+      DatanodeInfo getDatanode() {
+        return BalancerDatanode.this.datanode;
+      }
+
+      /** Decide if still need to move more bytes */
+      protected synchronized boolean hasSpaceForScheduling() {
+        return availableSizeToMove() > 0L;
+      }
+
+      /** @return the total number of bytes that need to be moved */
+      synchronized long availableSizeToMove() {
+        return maxSize2Move - scheduledSize;
+      }
+      
+      /** increment scheduled size */
+      synchronized void incScheduledSize(long size) {
+        scheduledSize += size;
+      }
+      
+      /** @return scheduled size */
+      synchronized long getScheduledSize() {
+        return scheduledSize;
+      }
+      
+      /** Reset scheduled size to zero. */
+      synchronized void resetScheduledSize() {
+        scheduledSize = 0L;
+      }
+
+      /** @return the name for display */
+      String getDisplayName() {
+        return datanode + ":" + storageType;
+      }
+      
+      @Override
+      public String toString() {
+        return "" + utilization;
+      }
+    }
+
     final DatanodeInfo datanode;
-    final double utilization;
-    final long maxSize2Move;
-    private long scheduledSize = 0L;
+    final EnumMap<StorageType, StorageGroup> storageMap
+        = new EnumMap<StorageType, StorageGroup>(StorageType.class);
     protected long delayUntil = 0L;
     //  blocks being moved but not confirmed yet
     private final List<PendingBlockMove> pendingBlocks;
@@ -526,78 +614,38 @@ public class Balancer {
     
     @Override
     public String toString() {
-      return getClass().getSimpleName() + "[" + datanode
-          + ", utilization=" + utilization + "]";
+      return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
     }
 
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
-        int maxConcurrentMoves) {
-      datanode = node;
-      utilization = policy.getUtilization(node);
-      final double avgUtil = policy.getAvgUtilization();
-      long maxSizeToMove;
-
-      if (utilization >= avgUtil+threshold
-          || utilization <= avgUtil-threshold) { 
-        maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
-      } else {
-        maxSizeToMove = 
-          (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
-      }
-      if (utilization < avgUtil ) {
-        maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
-      }
-      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+    private BalancerDatanode(DatanodeStorageReport report,
+        double threshold, int maxConcurrentMoves) {
+      this.datanode = report.getDatanodeInfo();
       this.maxConcurrentMoves = maxConcurrentMoves;
       this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
     }
     
-    /** Get the datanode */
-    protected DatanodeInfo getDatanode() {
-      return datanode;
-    }
-    
-    /** Get the name of the datanode */
-    protected String getDisplayName() {
-      return datanode.toString();
-    }
-    
-    /* Get the storage id of the datanode */
-    protected String getStorageID() {
-      return datanode.getDatanodeUuid();
-    }
-    
-    /** Decide if still need to move more bytes */
-    protected synchronized boolean hasSpaceForScheduling() {
-      return scheduledSize<maxSize2Move;
-    }
-
-    /** Return the total number of bytes that need to be moved */
-    protected synchronized long availableSizeToMove() {
-      return maxSize2Move-scheduledSize;
-    }
-    
-    /** increment scheduled size */
-    protected synchronized void incScheduledSize(long size) {
-      scheduledSize += size;
-    }
-    
-    /** decrement scheduled size */
-    protected synchronized void decScheduledSize(long size) {
-      scheduledSize -= size;
-    }
-    
-    /** get scheduled size */
-    protected synchronized long getScheduledSize(){
-      return scheduledSize;
-    }
-    
-    /** get scheduled size */
-    protected synchronized void setScheduledSize(long size){
-      scheduledSize = size;
+    private void put(StorageType storageType, StorageGroup g) {
+      final StorageGroup existing = storageMap.put(storageType, g);
+      Preconditions.checkState(existing == null);
+    }
+
+    StorageGroup addStorageGroup(StorageType storageType, double utilization,
+        long maxSize2Move) {
+      final StorageGroup g = new StorageGroup(storageType, utilization,
+          maxSize2Move);
+      put(storageType, g);
+      return g;
+    }
+
+    Source addSource(StorageType storageType, double utilization,
+        long maxSize2Move, Balancer balancer) {
+      final Source s = balancer.new Source(storageType, utilization,
+          maxSize2Move, this);
+      put(storageType, s);
+      return s;
     }
 
     synchronized private void activateDelay(long delta) {
@@ -640,9 +688,9 @@ public class Balancer {
       return pendingBlocks.remove(pendingBlock);
     }
   }
-  
+
   /** A node that can be the sources of a block move */
-  private class Source extends BalancerDatanode {
+  private class Source extends BalancerDatanode.StorageGroup {
     
     /* A thread that initiates a block move 
      * and waits for block move to complete */
@@ -653,7 +701,7 @@ public class Balancer {
       }
     }
     
-    private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
+    private final List<Task> tasks = new ArrayList<Task>(2);
     private long blocksToReceive = 0L;
     /* source blocks point to balancerBlocks in the global list because
      * we want to keep one copy of a block in balancer and be aware that
@@ -663,17 +711,17 @@ public class Balancer {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
-        int maxConcurrentMoves) {
-      super(node, policy, threshold, maxConcurrentMoves);
+    private Source(StorageType storageType, double utilization,
+        long maxSize2Move, BalancerDatanode dn) {
+      dn.super(storageType, utilization, maxSize2Move);
     }
     
-    /** Add a node task */
-    private void addNodeTask(NodeTask task) {
-      assert (task.datanode != this) :
-        "Source and target are the same " + datanode;
-      incScheduledSize(task.getSize());
-      nodeTasks.add(task);
+    /** Add a task */
+    private void addTask(Task task) {
+      Preconditions.checkState(task.target != this,
+          "Source and target are the same storage group " + getDisplayName());
+      incScheduledSize(task.size);
+      tasks.add(task);
     }
     
     /* Return an iterator to this source's blocks */
@@ -686,8 +734,10 @@ public class Balancer {
      * Return the total size of the received blocks in the number of bytes.
      */
     private long getBlockList() throws IOException {
-      BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, 
-        Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+      final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+      final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
+          getDatanode(), size).getBlocks();
+
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks) {
         bytesReceived += blk.getBlock().getNumBytes();
@@ -703,10 +753,13 @@ public class Balancer {
         
           synchronized (block) {
             // update locations
-            for (String datanodeUuid : blk.getDatanodeUuids()) {
-              final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (d != null) { // not an unknown datanode
-                block.addLocation(d);
+            final String[] datanodeUuids = blk.getDatanodeUuids();
+            final StorageType[] storageTypes = blk.getStorageTypes();
+            for (int i = 0; i < datanodeUuids.length; i++) {
+              final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+                  datanodeUuids[i], storageTypes[i]);
+              if (g != null) { // not unknown
+                block.addLocation(g);
               }
             }
           }
@@ -721,8 +774,8 @@ public class Balancer {
 
     /* Decide if the given block is a good candidate to move or not */
     private boolean isGoodBlockCandidate(BalancerBlock block) {
-      for (NodeTask nodeTask : nodeTasks) {
-        if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+      for (Task t : tasks) {
+        if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) {
           return true;
         }
       }
@@ -737,20 +790,20 @@ public class Balancer {
      * The block should be dispatched immediately after this method is returned.
      */
     private PendingBlockMove chooseNextBlockToMove() {
-      for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
-        NodeTask task = tasks.next();
-        BalancerDatanode target = task.getDatanode();
+      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
+        final Task task = i.next();
+        final BalancerDatanode target = task.target.getBalancerDatanode();
         PendingBlockMove pendingBlock = new PendingBlockMove();
         if (target.addPendingBlock(pendingBlock)) { 
           // target is not busy, so do a tentative block allocation
           pendingBlock.source = this;
-          pendingBlock.target = target;
+          pendingBlock.target = task.target;
           if ( pendingBlock.chooseBlockAndProxy() ) {
             long blockSize = pendingBlock.block.getNumBytes();
-            decScheduledSize(blockSize);
+            incScheduledSize(-blockSize);
             task.size -= blockSize;
             if (task.size == 0) {
-              tasks.remove();
+              i.remove();
             }
             return pendingBlock;
           } else {
@@ -824,7 +877,7 @@ public class Balancer {
           // in case no blocks can be moved for source node's task,
           // jump out of while-loop after 5 iterations.
           if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
-            setScheduledSize(0);
+            resetScheduledSize();
           }
         }
         
@@ -869,6 +922,8 @@ public class Balancer {
   Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
     this.threshold = p.threshold;
     this.policy = p.policy;
+    this.nodesToBeExcluded = p.nodesToBeExcluded;
+    this.nodesToBeIncluded = p.nodesToBeIncluded;
     this.nnc = theblockpool;
     cluster = NetworkTopology.getInstance(conf);
 
@@ -889,95 +944,154 @@ public class Balancer {
         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
-  /* Given a data node set, build a network topology and decide
-   * over-utilized datanodes, above average utilized datanodes, 
-   * below average utilized datanodes, and underutilized datanodes. 
-   * The input data node set is shuffled before the datanodes 
-   * are put into the over-utilized datanodes, above average utilized
-   * datanodes, below average utilized datanodes, and
-   * underutilized datanodes lists. This will add some randomness
-   * to the node matching later on.
-   * 
+  
+  private static long getCapacity(DatanodeStorageReport report, StorageType t) {
+    long capacity = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        capacity += r.getCapacity();
+      }
+    }
+    return capacity;
+  }
+
+  private static long getRemaining(DatanodeStorageReport report, StorageType t) {
+    long remaining = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        remaining += r.getRemaining();
+      }
+    }
+    return remaining;
+  }
+
+  private boolean shouldIgnore(DatanodeInfo dn) {
+    //ignore decommissioned nodes
+    final boolean decommissioned = dn.isDecommissioned();
+    //ignore decommissioning nodes
+    final boolean decommissioning = dn.isDecommissionInProgress();
+    // ignore nodes in exclude list
+    final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn);
+    // ignore nodes not in the include list (if include list is not empty)
+    final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn);
+    
+    if (decommissioned || decommissioning || excluded || notIncluded) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+            + decommissioning + ", " + excluded + ", " + notIncluded);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Given a datanode storage set, build a network topology and decide
+   * over-utilized storages, above average utilized storages, 
+   * below average utilized storages, and underutilized storages. 
+   * The input datanode storage set is shuffled in order to randomize
+   * to the storage matching later on.
+   *
    * @return the total number of bytes that are 
    *                needed to move to make the cluster balanced.
-   * @param datanodes a set of datanodes
+   * @param reports a set of datanode storage reports
    */
-  private long initNodes(DatanodeInfo[] datanodes) {
+  private long init(DatanodeStorageReport[] reports) {
     // compute average utilization
-    for (DatanodeInfo datanode : datanodes) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
+    for (DatanodeStorageReport r : reports) {
+      if (shouldIgnore(r.getDatanodeInfo())) {
+        continue; 
       }
-      policy.accumulateSpaces(datanode);
+      policy.accumulateSpaces(r);
     }
     policy.initAvgUtilization();
 
-    /*create network topology and all data node lists: 
-     * overloaded, above-average, below-average, and underloaded
-     * we alternates the accessing of the given datanodes array either by
-     * an increasing order or a decreasing order.
-     */  
+    // create network topology and classify utilization collections: 
+    //   over-utilized, above-average, below-average and under-utilized.
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+    for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
+      final DatanodeInfo datanode = r.getDatanodeInfo();
+      if (shouldIgnore(datanode)) {
         continue; // ignore decommissioning or decommissioned nodes
       }
       cluster.add(datanode);
-      BalancerDatanode datanodeS;
-      final double avg = policy.getAvgUtilization();
-      if (policy.getUtilization(datanode) >= avg) {
-        datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
-        if (isAboveAvgUtilized(datanodeS)) {
-          this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
-        } else {
-          assert(isOverUtilized(datanodeS)) :
-            datanodeS.getDisplayName()+ "is not an overUtilized node";
-          this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avg
-              -threshold)*datanodeS.datanode.getCapacity()/100.0);
+
+      final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes,
+          maxConcurrentMovesPerNode);
+      for(StorageType t : StorageType.asList()) {
+        final Double utilization = policy.getUtilization(r, t);
+        if (utilization == null) { // datanode does not have such storage type 
+          continue;
         }
-      } else {
-        datanodeS = new BalancerDatanode(datanode, policy, threshold,
-            maxConcurrentMovesPerNode);
-        if ( isBelowOrEqualAvgUtilized(datanodeS)) {
-          this.belowAvgUtilizedDatanodes.add(datanodeS);
+        
+        final long capacity = getCapacity(r, t);
+        final double utilizationDiff = utilization - policy.getAvgUtilization(t);
+        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
+        final long maxSize2Move = computeMaxSize2Move(capacity,
+            getRemaining(r, t), utilizationDiff, threshold);
+
+        final BalancerDatanode.StorageGroup g;
+        if (utilizationDiff > 0) {
+          final Source s = dn.addSource(t, utilization, maxSize2Move, this);
+          if (thresholdDiff <= 0) { // within threshold
+            aboveAvgUtilized.add(s);
+          } else {
+            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overUtilized.add(s);
+          }
+          g = s;
         } else {
-          assert isUnderUtilized(datanodeS) : "isUnderUtilized("
-              + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
-              + ", utilization=" + datanodeS.utilization; 
-          this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avg-threshold-
-              datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+          g = dn.addStorageGroup(t, utilization, maxSize2Move);
+          if (thresholdDiff <= 0) { // within threshold
+            belowAvgUtilized.add(g);
+          } else {
+            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underUtilized.add(g);
+          }
         }
+        storageGroupMap.put(g);
       }
-      datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
     }
 
-    //logging
-    logNodes();
+    logUtilizationCollections();
     
-    assert (this.datanodeMap.size() == 
-      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
-      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
-      : "Mismatched number of datanodes";
+    Preconditions.checkState(storageGroupMap.size() == overUtilized.size()
+        + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(),
+        "Mismatched number of storage groups");
     
     // return number of bytes to be moved in order to make the cluster balanced
     return Math.max(overLoadedBytes, underLoadedBytes);
   }
 
+  private static long computeMaxSize2Move(final long capacity, final long remaining,
+      final double utilizationDiff, final double threshold) {
+    final double diff = Math.min(threshold, Math.abs(utilizationDiff));
+    long maxSizeToMove = precentage2bytes(diff, capacity);
+    if (utilizationDiff < 0) {
+      maxSizeToMove = Math.min(remaining, maxSizeToMove);
+    }
+    return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+  }
+
+  private static long precentage2bytes(double precentage, long capacity) {
+    Preconditions.checkArgument(precentage >= 0,
+        "precentage = " + precentage + " < 0");
+    return (long)(precentage * capacity / 100.0);
+  }
+
   /* log the over utilized & under utilized nodes */
-  private void logNodes() {
-    logNodes("over-utilized", overUtilizedDatanodes);
+  private void logUtilizationCollections() {
+    logUtilizationCollection("over-utilized", overUtilized);
     if (LOG.isTraceEnabled()) {
-      logNodes("above-average", aboveAvgUtilizedDatanodes);
-      logNodes("below-average", belowAvgUtilizedDatanodes);
+      logUtilizationCollection("above-average", aboveAvgUtilized);
+      logUtilizationCollection("below-average", belowAvgUtilized);
     }
-    logNodes("underutilized", underUtilizedDatanodes);
+    logUtilizationCollection("underutilized", underUtilized);
   }
 
-  private static <T extends BalancerDatanode> void logNodes(
-      String name, Collection<T> nodes) {
-    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  private static <T extends BalancerDatanode.StorageGroup>
+      void logUtilizationCollection(String name, Collection<T> items) {
+    LOG.info(items.size() + " " + name + ": " + items);
   }
 
   /** A matcher interface for matching nodes. */
@@ -1013,26 +1127,24 @@ public class Balancer {
   /**
    * Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
-   * Maximum bytes to be moved per node is
-   * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
-   * Return total number of bytes to move in this iteration
+   * Maximum bytes to be moved per storage group is
+   * min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
+   * @return total number of bytes to move in this iteration
    */
-  private long chooseNodes() {
+  private long chooseStorageGroups() {
     // First, match nodes on the same node group if cluster is node group aware
     if (cluster.isNodeGroupAware()) {
-      chooseNodes(SAME_NODE_GROUP);
+      chooseStorageGroups(SAME_NODE_GROUP);
     }
     
     // Then, match nodes on the same rack
-    chooseNodes(SAME_RACK);
+    chooseStorageGroups(SAME_RACK);
     // At last, match all remaining nodes
-    chooseNodes(ANY_OTHER);
+    chooseStorageGroups(ANY_OTHER);
     
-    assert (datanodeMap.size() >= sources.size()+targets.size())
-      : "Mismatched number of datanodes (" +
-      datanodeMap.size() + " total, " +
-      sources.size() + " sources, " +
-      targets.size() + " targets)";
+    Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
+        "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
+            + sources.size() + " sources, " + targets.size() + " targets)");
 
     long bytesToMove = 0L;
     for (Source src : sources) {
@@ -1042,25 +1154,25 @@ public class Balancer {
   }
 
   /** Decide all <source, target> pairs according to the matcher. */
-  private void chooseNodes(final Matcher matcher) {
+  private void chooseStorageGroups(final Matcher matcher) {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, underUtilized, matcher);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
 
     /* match each remaining underutilized datanode (target) to 
      * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
   }
 
   /**
@@ -1068,13 +1180,14 @@ public class Balancer {
    * datanodes or the candidates are source nodes with (utilization > Avg), and
    * the others are target nodes with (utilization < Avg).
    */
-  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
-      chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+  private <G extends BalancerDatanode.StorageGroup,
+           C extends BalancerDatanode.StorageGroup>
+      void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
           Matcher matcher) {
-    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
-      final D datanode = i.next();
-      for(; chooseForOneDatanode(datanode, candidates, matcher); );
-      if (!datanode.hasSpaceForScheduling()) {
+    for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
+      final G g = i.next();
+      for(; choose4One(g, candidates, matcher); );
+      if (!g.hasSpaceForScheduling()) {
         i.remove();
       }
     }
@@ -1084,18 +1197,19 @@ public class Balancer {
    * For the given datanode, choose a candidate and then schedule it.
    * @return true if a candidate is chosen; false if no candidates is chosen.
    */
-  private <C extends BalancerDatanode> boolean chooseForOneDatanode(
-      BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+  private <C extends BalancerDatanode.StorageGroup>
+      boolean choose4One(BalancerDatanode.StorageGroup g,
+          Collection<C> candidates, Matcher matcher) {
     final Iterator<C> i = candidates.iterator();
-    final C chosen = chooseCandidate(dn, i, matcher);
-
+    final C chosen = chooseCandidate(g, i, matcher);
+  
     if (chosen == null) {
       return false;
     }
-    if (dn instanceof Source) {
-      matchSourceWithTargetToMove((Source)dn, chosen);
+    if (g instanceof Source) {
+      matchSourceWithTargetToMove((Source)g, chosen);
     } else {
-      matchSourceWithTargetToMove((Source)chosen, dn);
+      matchSourceWithTargetToMove((Source)chosen, g);
     }
     if (!chosen.hasSpaceForScheduling()) {
       i.remove();
@@ -1103,27 +1217,28 @@ public class Balancer {
     return true;
   }
   
-  private void matchSourceWithTargetToMove(
-      Source source, BalancerDatanode target) {
+  private void matchSourceWithTargetToMove(Source source,
+      BalancerDatanode.StorageGroup target) {
     long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
-    NodeTask nodeTask = new NodeTask(target, size);
-    source.addNodeTask(nodeTask);
-    target.incScheduledSize(nodeTask.getSize());
+    final Task task = new Task(target, size);
+    source.addTask(task);
+    target.incScheduledSize(task.size);
     sources.add(source);
     targets.add(target);
     LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-        +source.datanode.getName() + " to " + target.datanode.getName());
+        + source.getDisplayName() + " to " + target.getDisplayName());
   }
   
   /** Choose a candidate for the given datanode. */
-  private <D extends BalancerDatanode, C extends BalancerDatanode>
-      C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
-    if (dn.hasSpaceForScheduling()) {
+  private <G extends BalancerDatanode.StorageGroup,
+           C extends BalancerDatanode.StorageGroup>
+      C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
+    if (g.hasSpaceForScheduling()) {
       for(; candidates.hasNext(); ) {
         final C c = candidates.next();
         if (!c.hasSpaceForScheduling()) {
           candidates.remove();
-        } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+        } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) {
           return c;
         }
       }
@@ -1177,9 +1292,10 @@ public class Balancer {
     boolean shouldWait;
     do {
       shouldWait = false;
-      for (BalancerDatanode target : targets) {
-        if (!target.isPendingQEmpty()) {
+      for (BalancerDatanode.StorageGroup target : targets) {
+        if (!target.getBalancerDatanode().isPendingQEmpty()) {
           shouldWait = true;
+          break;
         }
       }
       if (shouldWait) {
@@ -1248,12 +1364,15 @@ public class Balancer {
    * 3. doing the move does not reduce the number of racks that the block has
    */
   private boolean isGoodBlockCandidate(Source source, 
-      BalancerDatanode target, BalancerBlock block) {
+      BalancerDatanode.StorageGroup target, BalancerBlock block) {
+    if (source.storageType != target.storageType) {
+      return false;
+    }
     // check if the block is moved or not
     if (movedBlocks.contains(block)) {
-        return false;
+      return false;
     }
-    if (block.isLocatedOnDatanode(target)) {
+    if (block.isLocatedOn(target)) {
       return false;
     }
     if (cluster.isNodeGroupAware() && 
@@ -1268,8 +1387,8 @@ public class Balancer {
     } else {
       boolean notOnSameRack = true;
       synchronized (block) {
-        for (BalancerDatanode loc : block.locations) {
-          if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
+        for (BalancerDatanode.StorageGroup loc : block.locations) {
+          if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
             notOnSameRack = false;
             break;
           }
@@ -1280,9 +1399,9 @@ public class Balancer {
         goodBlock = true;
       } else {
         // good if source is on the same rack as on of the replicas
-        for (BalancerDatanode loc : block.locations) {
+        for (BalancerDatanode.StorageGroup loc : block.locations) {
           if (loc != source && 
-              cluster.isOnSameRack(loc.datanode, source.datanode)) {
+              cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
             goodBlock = true;
             break;
           }
@@ -1303,25 +1422,26 @@ public class Balancer {
    * @return true if there are any replica (other than source) on the same node
    * group with target
    */
-  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
       BalancerBlock block, Source source) {
-    for (BalancerDatanode loc : block.locations) {
+    final DatanodeInfo targetDn = target.getDatanode();
+    for (BalancerDatanode.StorageGroup loc : block.locations) {
       if (loc != source && 
-        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
-          return true;
-        }
+          cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
+        return true;
       }
+    }
     return false;
   }
 
   /* reset all fields in a balancer preparing for the next iteration */
   private void resetData(Configuration conf) {
     this.cluster = NetworkTopology.getInstance(conf);
-    this.overUtilizedDatanodes.clear();
-    this.aboveAvgUtilizedDatanodes.clear();
-    this.belowAvgUtilizedDatanodes.clear();
-    this.underUtilizedDatanodes.clear();
-    this.datanodeMap.clear();
+    this.overUtilized.clear();
+    this.aboveAvgUtilized.clear();
+    this.belowAvgUtilized.clear();
+    this.underUtilized.clear();
+    this.storageGroupMap.clear();
     this.sources.clear();
     this.targets.clear();  
     this.policy.reset();
@@ -1341,32 +1461,6 @@ public class Balancer {
       }
     }
   }
-  
-  /* Return true if the given datanode is overUtilized */
-  private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (policy.getAvgUtilization()+threshold);
-  }
-  
-  /* Return true if the given datanode is above or equal to average utilized
-   * but not overUtilized */
-  private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization <= (avg+threshold))
-        && (datanode.utilization >= avg);
-  }
-  
-  /* Return true if the given datanode is underUtilized */
-  private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (policy.getAvgUtilization()-threshold);
-  }
-
-  /* Return true if the given datanode is below average utilized 
-   * but not underUtilized */
-  private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization >= (avg-threshold))
-             && (datanode.utilization <= avg);
-  }
 
   // Exit status
   enum ReturnStatus {
@@ -1394,7 +1488,8 @@ public class Balancer {
       /* get all live datanodes of a cluster and their disk usage
        * decide the number of bytes need to be moved
        */
-      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+      final long bytesLeftToMove = init(
+          nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
       if (bytesLeftToMove == 0) {
         System.out.println("The cluster is balanced. Exiting...");
         return ReturnStatus.SUCCESS;
@@ -1408,7 +1503,7 @@ public class Balancer {
        * in this iteration. Maximum bytes to be moved per node is
        * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
        */
-      final long bytesToMove = chooseNodes();
+      final long bytesToMove = chooseStorageGroups();
       if (bytesToMove == 0) {
         System.out.println("No block can be moved. Exiting...");
         return ReturnStatus.NO_MOVE_BLOCK;
@@ -1526,21 +1621,101 @@ public class Balancer {
   }
 
   static class Parameters {
-    static final Parameters DEFALUT = new Parameters(
-        BalancingPolicy.Node.INSTANCE, 10.0);
+    static final Parameters DEFAULT = new Parameters(
+        BalancingPolicy.Node.INSTANCE, 10.0,
+        Collections.<String> emptySet(), Collections.<String> emptySet());
 
     final BalancingPolicy policy;
     final double threshold;
+    // exclude the nodes in this set from balancing operations
+    Set<String> nodesToBeExcluded;
+    //include only these nodes in balancing operations
+    Set<String> nodesToBeIncluded;
 
-    Parameters(BalancingPolicy policy, double threshold) {
+    Parameters(BalancingPolicy policy, double threshold,
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
       this.policy = policy;
       this.threshold = threshold;
+      this.nodesToBeExcluded = nodesToBeExcluded;
+      this.nodesToBeIncluded = nodesToBeIncluded;
     }
 
     @Override
     public String toString() {
       return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold + "]";
+          + "[" + policy + ", threshold=" + threshold +
+          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+    }
+  }
+
+  static class Util {
+
+    /**
+     * @param datanode
+     * @return returns true if data node is part of the excludedNodes.
+     */
+    static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
+      return isIn(excludedNodes, datanode);
+    }
+
+    /**
+     * @param datanode
+     * @return returns true if includedNodes is empty or data node is part of the includedNodes.
+     */
+    static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
+      return (includedNodes.isEmpty() ||
+          isIn(includedNodes, datanode));
+    }
+    /**
+     * Match is checked using host name , ip address with and without port number.
+     * @param datanodeSet
+     * @param datanode
+     * @return true if the datanode's transfer address matches the set of nodes.
+     */
+    private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
+      return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
+          isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
+          isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
+    }
+
+    /**
+     * returns true if nodes contains host or host:port
+     * @param nodes
+     * @param host
+     * @param port
+     * @return
+     */
+    private static boolean isIn(Set<String> nodes, String host, int port) {
+      if (host == null) {
+        return false;
+      }
+      return (nodes.contains(host) || nodes.contains(host +":"+ port));
+    }
+
+    /**
+     * parse a comma separated string to obtain set of host names
+     * @param string
+     * @return
+     */
+    static Set<String> parseHostList(String string) {
+      String[] addrs = StringUtils.getTrimmedStrings(string);
+      return new HashSet<String>(Arrays.asList(addrs));
+    }
+
+    /**
+     * read set of host names from a file
+     * @param fileName
+     * @return
+     */
+    static Set<String> getHostListFromFile(String fileName) {
+      Set<String> nodes = new HashSet <String> ();
+      try {
+        HostsFileReader.readFileToSet("nodes", fileName, nodes);
+        return StringUtils.getTrimmedStrings(nodes);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Unable to open file: " + fileName);
+      }
     }
   }
 
@@ -1578,15 +1753,17 @@ public class Balancer {
 
     /** parse command line arguments */
     static Parameters parse(String[] args) {
-      BalancingPolicy policy = Parameters.DEFALUT.policy;
-      double threshold = Parameters.DEFALUT.threshold;
+      BalancingPolicy policy = Parameters.DEFAULT.policy;
+      double threshold = Parameters.DEFAULT.threshold;
+      Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+      Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
 
       if (args != null) {
         try {
           for(int i = 0; i < args.length; i++) {
-            checkArgument(args.length >= 2, "args = " + Arrays.toString(args));
             if ("-threshold".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "Threshold value is missing: args = " + Arrays.toString(args));
               try {
                 threshold = Double.parseDouble(args[i]);
                 if (threshold < 1 || threshold > 100) {
@@ -1601,25 +1778,52 @@ public class Balancer {
                 throw e;
               }
             } else if ("-policy".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "Policy value is missing: args = " + Arrays.toString(args));
               try {
                 policy = BalancingPolicy.parse(args[i]);
               } catch(IllegalArgumentException e) {
                 System.err.println("Illegal policy name: " + args[i]);
                 throw e;
               }
+            } else if ("-exclude".equalsIgnoreCase(args[i])) {
+              checkArgument(++i < args.length,
+                  "List of nodes to exclude | -f <filename> is missing: args = "
+                  + Arrays.toString(args));
+              if ("-f".equalsIgnoreCase(args[i])) {
+                checkArgument(++i < args.length,
+                    "File containing nodes to exclude is not specified: args = "
+                    + Arrays.toString(args));
+                nodesTobeExcluded = Util.getHostListFromFile(args[i]);
+              } else {
+                nodesTobeExcluded = Util.parseHostList(args[i]);
+              }
+            } else if ("-include".equalsIgnoreCase(args[i])) {
+              checkArgument(++i < args.length,
+                "List of nodes to include | -f <filename> is missing: args = "
+                + Arrays.toString(args));
+              if ("-f".equalsIgnoreCase(args[i])) {
+                checkArgument(++i < args.length,
+                    "File containing nodes to include is not specified: args = "
+                    + Arrays.toString(args));
+                nodesTobeIncluded = Util.getHostListFromFile(args[i]);
+               } else {
+                nodesTobeIncluded = Util.parseHostList(args[i]);
+              }
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
             }
           }
+          checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(),
+              "-exclude and -include options cannot be specified together.");
         } catch(RuntimeException e) {
           printUsage(System.err);
           throw e;
         }
       }
       
-      return new Parameters(policy, threshold);
+      return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
     }
 
     private static void printUsage(PrintStream out) {

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Tue Aug  5 02:30:54 2014
@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
 
 /**
  * Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D
  */
 @InterfaceAudience.Private
 abstract class BalancingPolicy {
-  long totalCapacity;
-  long totalUsedSpace;
-  private double avgUtilization;
+  final EnumCounters<StorageType> totalCapacities
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumCounters<StorageType> totalUsedSpaces
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumDoubles<StorageType> avgUtilizations
+      = new EnumDoubles<StorageType>(StorageType.class);
 
   void reset() {
-    totalCapacity = 0L;
-    totalUsedSpace = 0L;
-    avgUtilization = 0.0;
+    totalCapacities.reset();
+    totalUsedSpaces.reset();
+    avgUtilizations.reset();
   }
 
   /** Get the policy name. */
   abstract String getName();
 
   /** Accumulate used space and capacity. */
-  abstract void accumulateSpaces(DatanodeInfo d);
+  abstract void accumulateSpaces(DatanodeStorageReport r);
 
   void initAvgUtilization() {
-    this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+    for(StorageType t : StorageType.asList()) {
+      final long capacity = totalCapacities.get(t);
+      if (capacity > 0L) {
+        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
+        avgUtilizations.set(t, avg);
+      }
+    }
   }
-  double getAvgUtilization() {
-    return avgUtilization;
+
+  double getAvgUtilization(StorageType t) {
+    return avgUtilizations.get(t);
   }
 
-  /** Return the utilization of a datanode */
-  abstract double getUtilization(DatanodeInfo d);
+  /** @return the utilization of a particular storage type of a datanode;
+   *          or return null if the datanode does not have such storage type.
+   */
+  abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
   
   @Override
   public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getDfsUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getDfsUsed());
+      }
     }
     
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getDfsUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long dfsUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          dfsUsed += s.getDfsUsed();
+        }
+      }
+      return capacity == 0L? null: dfsUsed*100.0/capacity;
     }
   }
 
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getBlockPoolUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getBlockPoolUsed());
+      }
     }
 
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getBlockPoolUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long blockPoolUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          blockPoolUsed += s.getBlockPoolUsed();
+        }
+      }
+      return capacity == 0L? null: blockPoolUsed*100.0/capacity;
     }
   }
 }