You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/08/01 03:29:54 UTC

svn commit: r1615020 [1/2] - in /hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/...

Author: szetszwo
Date: Fri Aug  1 01:29:49 2014
New Revision: 1615020

URL: http://svn.apache.org/r1615020
Log:
Merge r1609845 through r1615019 from trunk.

Added:
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java
      - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/FsActionParam.java
      - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/FsActionParam.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithXAttrs.java
      - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithXAttrs.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsWithXAttrs.java
      - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsWithXAttrs.java
Modified:
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/XmlEditsVisitor.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsUrl.java
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermissionSymlinks.java

Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1614232-1615019

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug  1 01:29:49 2014
@@ -204,9 +204,6 @@ Trunk (Unreleased)
 
     HDFS-3549. Fix dist tar build fails in hadoop-hdfs-raid project. (Jason Lowe via daryn)
 
-    HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException 
-    if option is specified without values. ( Madhukara Phatak via umamahesh) 
-
     HDFS-3614. Revert unused MiniDFSCluster constructor from HDFS-3049.
     (acmurthy via eli)
 
@@ -346,6 +343,20 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-6739. Add getDatanodeStorageReport to ClientProtocol. (szetszwo)
 
+    HDFS-6665. Add tests for XAttrs in combination with viewfs.
+    (Stephen Chu via wang)
+
+    HDFS-6778. The extended attributes javadoc should simply refer to the
+    user docs. (clamb via wang)
+
+    HDFS-6570. add api that enables checking if a user has certain permissions on
+    a file. (Jitendra Pandey via cnauroth)
+
+    HDFS-6441. Add ability to exclude/include specific datanodes while
+    balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
+
+    HDFS-6685. Balancer should preserve storage type of replicas.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -408,6 +419,16 @@ Release 2.6.0 - UNRELEASED
     HDFS-6749. FSNamesystem methods should call resolvePath.
     (Charles Lamb via cnauroth)
 
+    HDFS-4629. Using com.sun.org.apache.xml.internal.serialize.* in
+    XmlEditsVisitor.java is JVM vendor specific. Breaks IBM JAVA.
+    (Amir Sanjar via stevel)
+
+    HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException 
+    if option is specified without values. ( Madhukara Phatak via umamahesh) 
+
+    HDFS-6797. DataNode logs wrong layoutversion during upgrade. (Benoy Antony
+    via Arpit Agarwal)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -963,6 +984,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6717. JIRA HDFS-5804 breaks default nfs-gateway behavior for unsecured config
     (brandonli)
 
+    HDFS-6768. Fix a few unit tests that use hard-coded port numbers. (Arpit
+    Agarwal)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml Fri Aug  1 01:29:49 2014
@@ -176,6 +176,11 @@ http://maven.apache.org/xsd/maven-4.0.0.
       <artifactId>netty</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>xerces</groupId>
+      <artifactId>xercesImpl</artifactId>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
   <build>

Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1614232-1615019

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Fri Aug  1 01:29:49 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
@@ -448,6 +449,11 @@ public class Hdfs extends AbstractFileSy
     dfs.removeXAttr(getUriPath(path), name);
   }
 
+  @Override
+  public void access(Path path, final FsAction mode) throws IOException {
+    dfs.checkAccess(getUriPath(path), mode);
+  }
+
   /**
    * Renew an existing delegation token.
    * 

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Aug  1 01:29:49 2014
@@ -122,6 +122,7 @@ import org.apache.hadoop.fs.XAttrSetFlag
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -2832,6 +2833,17 @@ public class DFSClient implements java.i
     }
   }
 
+  public void checkAccess(String src, FsAction mode) throws IOException {
+    checkOpen();
+    try {
+      namenode.checkAccess(src, mode);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          FileNotFoundException.class,
+          UnresolvedPathException.class);
+    }
+  }
+
   @Override // RemotePeerFactory
   public Peer newConnectedPeer(InetSocketAddress addr,
       Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Aug  1 01:29:49 2014
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -1898,4 +1899,23 @@ public class DistributedFileSystem exten
       }
     }.resolve(this, absF);
   }
+
+  @Override
+  public void access(Path path, final FsAction mode) throws IOException {
+    final Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.checkAccess(getPathName(p), mode);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        fs.access(p, mode);
+        return null;
+      }
+    }.resolve(this, absF);
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Fri Aug  1 01:29:49 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -35,4 +38,10 @@ public enum StorageType {
   public static final StorageType DEFAULT = DISK;
   
   public static final StorageType[] EMPTY_ARRAY = {};
-}
\ No newline at end of file
+  
+  private static final StorageType[] VALUES = values();
+  
+  public static List<StorageType> asList() {
+    return Arrays.asList(VALUES);
+  }
+}

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Aug  1 01:29:49 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -1267,17 +1268,11 @@ public interface ClientProtocol {
   
   /**
    * Set xattr of a file or directory.
-   * A regular user only can set xattr of "user" namespace.
-   * A super user can set xattr of "user" and "trusted" namespace.
-   * XAttr of "security" and "system" namespace is only used/exposed 
-   * internally to the FS impl.
+   * The name must be prefixed with the namespace followed by ".". For example,
+   * "user.attr".
    * <p/>
-   * For xattr of "user" namespace, its access permissions are 
-   * defined by the file or directory permission bits.
-   * XAttr will be set only when login user has correct permissions.
-   * <p/>
-   * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
-   * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+   * Refer to the HDFS extended attributes user documentation for details.
+   *
    * @param src file or directory
    * @param xAttr <code>XAttr</code> to set
    * @param flag set flag
@@ -1288,18 +1283,13 @@ public interface ClientProtocol {
       throws IOException;
   
   /**
-   * Get xattrs of file or directory. Values in xAttrs parameter are ignored.
-   * If xattrs is null or empty, equals getting all xattrs of the file or 
-   * directory.
-   * Only xattrs which login user has correct permissions will be returned. 
-   * <p/>
-   * A regular user only can get xattr of "user" namespace.
-   * A super user can get xattr of "user" and "trusted" namespace.
-   * XAttr of "security" and "system" namespace is only used/exposed 
-   * internally to the FS impl.
+   * Get xattrs of a file or directory. Values in xAttrs parameter are ignored.
+   * If xAttrs is null or empty, this is the same as getting all xattrs of the
+   * file or directory.  Only those xattrs for which the logged-in user has
+   * permissions to view are returned.
    * <p/>
-   * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
-   * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+   * Refer to the HDFS extended attributes user documentation for details.
+   *
    * @param src file or directory
    * @param xAttrs xAttrs to get
    * @return List<XAttr> <code>XAttr</code> list 
@@ -1314,13 +1304,8 @@ public interface ClientProtocol {
    * Only the xattr names for which the logged in user has the permissions to
    * access will be returned.
    * <p/>
-   * A regular user only can get xattr names from the "user" namespace.
-   * A super user can get xattr names of the "user" and "trusted" namespace.
-   * XAttr names of the "security" and "system" namespaces are only used/exposed
-   * internally by the file system impl.
-   * <p/>
-   * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
-   * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+   * Refer to the HDFS extended attributes user documentation for details.
+   *
    * @param src file or directory
    * @param xAttrs xAttrs to get
    * @return List<XAttr> <code>XAttr</code> list
@@ -1332,19 +1317,33 @@ public interface ClientProtocol {
   
   /**
    * Remove xattr of a file or directory.Value in xAttr parameter is ignored.
-   * Name must be prefixed with user/trusted/security/system.
+   * The name must be prefixed with the namespace followed by ".". For example,
+   * "user.attr".
    * <p/>
-   * A regular user only can remove xattr of "user" namespace.
-   * A super user can remove xattr of "user" and "trusted" namespace.
-   * XAttr of "security" and "system" namespace is only used/exposed 
-   * internally to the FS impl.
-   * <p/>
-   * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
-   * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+   * Refer to the HDFS extended attributes user documentation for details.
+   *
    * @param src file or directory
    * @param xAttr <code>XAttr</code> to remove
    * @throws IOException
    */
   @AtMostOnce
   public void removeXAttr(String src, XAttr xAttr) throws IOException;
+
+  /**
+   * Checks if the user can access a path.  The mode specifies which access
+   * checks to perform.  If the requested permissions are granted, then the
+   * method returns normally.  If access is denied, then the method throws an
+   * {@link AccessControlException}.
+   * In general, applications should avoid using this method, due to the risk of
+   * time-of-check/time-of-use race conditions.  The permissions on a file may
+   * change immediately after the access call returns.
+   *
+   * @param path Path to check
+   * @param mode type of access to check
+   * @throws AccessControlException if access is denied
+   * @throws FileNotFoundException if the path does not exist
+   * @throws IOException see specific implementation
+   */
+  @Idempotent
+  public void checkAccess(String path, FsAction mode) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Aug  1 01:29:49 2014
@@ -174,6 +174,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -320,6 +322,9 @@ public class ClientNamenodeProtocolServe
   private static final RemoveXAttrResponseProto
     VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance();
 
+  private static final CheckAccessResponseProto
+    VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance();
+
   /**
    * Constructor
    * 
@@ -1338,4 +1343,15 @@ public class ClientNamenodeProtocolServe
     }
     return VOID_REMOVEXATTR_RESPONSE;
   }
+
+  @Override
+  public CheckAccessResponseProto checkAccess(RpcController controller,
+     CheckAccessRequestProto req) throws ServiceException {
+    try {
+      server.checkAccess(req.getPath(), PBHelper.convert(req.getMode()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return VOID_CHECKACCESS_RESPONSE;
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Aug  1 01:29:49 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -144,6 +145,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -1346,4 +1348,15 @@ public class ClientNamenodeProtocolTrans
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public void checkAccess(String path, FsAction mode) throws IOException {
+    CheckAccessRequestProto req = CheckAccessRequestProto.newBuilder()
+        .setPath(path).setMode(PBHelper.convert(mode)).build();
+    try {
+      rpcProxy.checkAccess(null, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Aug  1 01:29:49 2014
@@ -352,15 +352,19 @@ public class PBHelper {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
-        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
+        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
+        .build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
     final List<String> datanodeUuids = b.getDatanodeUuidsList();
     final List<String> storageUuids = b.getStorageUuidsList();
+    final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
     return new BlockWithLocations(convert(b.getBlock()),
         datanodeUuids.toArray(new String[datanodeUuids.size()]),
-        storageUuids.toArray(new String[storageUuids.size()]));
+        storageUuids.toArray(new String[storageUuids.size()]),
+        convertStorageTypes(storageTypes, storageUuids.size()));
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@ -2111,11 +2115,11 @@ public class PBHelper {
     return castEnum(v, XATTR_NAMESPACE_VALUES);
   }
 
-  private static FsActionProto convert(FsAction v) {
+  public static FsActionProto convert(FsAction v) {
     return FsActionProto.valueOf(v != null ? v.ordinal() : 0);
   }
 
-  private static FsAction convert(FsActionProto v) {
+  public static FsAction convert(FsActionProto v) {
     return castEnum(v, FSACTION_VALUES);
   }
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug  1 01:29:49 2014
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -78,16 +80,21 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.base.Preconditions;
+
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
  * The tool is deployed as an application program that can be run by the 
@@ -188,7 +195,9 @@ import org.apache.hadoop.util.ToolRunner
 @InterfaceAudience.Private
 public class Balancer {
   static final Log LOG = LogFactory.getLog(Balancer.class);
-  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+  final private static long GB = 1L << 30; //1GB
+  final private static long MAX_SIZE_TO_MOVE = 10*GB;
+  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
   private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
 
   /** The maximum number of concurrent blocks moves for 
@@ -203,34 +212,38 @@ public class Balancer {
       + "\n\t[-policy <policy>]\tthe balancing policy: "
       + BalancingPolicy.Node.INSTANCE.getName() + " or "
       + BalancingPolicy.Pool.INSTANCE.getName()
-      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+      + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tExcludes the specified datanodes."
+      + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+      + "\tIncludes only the specified datanodes.";
   
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
   private final SaslDataTransferClient saslClient;
   private final double threshold;
+  // set of data nodes to be excluded from balancing operations.
+  Set<String> nodesToBeExcluded;
+  //Restrict balancing to the following nodes.
+  Set<String> nodesToBeIncluded;
   
   // all data node lists
-  private final Collection<Source> overUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<Source> aboveAvgUtilizedDatanodes
-                               = new LinkedList<Source>();
-  private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  private final Collection<BalancerDatanode> underUtilizedDatanodes
-                               = new LinkedList<BalancerDatanode>();
-  
-  private final Collection<Source> sources
-                               = new HashSet<Source>();
-  private final Collection<BalancerDatanode> targets
-                               = new HashSet<BalancerDatanode>();
+  private final Collection<Source> overUtilized = new LinkedList<Source>();
+  private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized
+      = new LinkedList<BalancerDatanode.StorageGroup>();
+  private final Collection<BalancerDatanode.StorageGroup> underUtilized
+      = new LinkedList<BalancerDatanode.StorageGroup>();
+  
+  private final Collection<Source> sources = new HashSet<Source>();
+  private final Collection<BalancerDatanode.StorageGroup> targets
+      = new HashSet<BalancerDatanode.StorageGroup>();
   
   private final Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
   private final MovedBlocks movedBlocks = new MovedBlocks();
-  /** Map (datanodeUuid -> BalancerDatanodes) */
-  private final Map<String, BalancerDatanode> datanodeMap
-      = new HashMap<String, BalancerDatanode>();
+  /** Map (datanodeUuid,storageType -> StorageGroup) */
+  private final StorageGroupMap storageGroupMap = new StorageGroupMap();
   
   private NetworkTopology cluster;
 
@@ -238,12 +251,39 @@ public class Balancer {
   private final ExecutorService dispatcherExecutor;
   private final int maxConcurrentMovesPerNode;
 
+
+  private static class StorageGroupMap {
+    private static String toKey(String datanodeUuid, StorageType storageType) {
+      return datanodeUuid + ":" + storageType;
+    }
+
+    private final Map<String, BalancerDatanode.StorageGroup> map
+        = new HashMap<String, BalancerDatanode.StorageGroup>();
+    
+    BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) {
+      return map.get(toKey(datanodeUuid, storageType));
+    }
+    
+    void put(BalancerDatanode.StorageGroup g) {
+      final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
+      final BalancerDatanode.StorageGroup existing = map.put(key, g);
+      Preconditions.checkState(existing == null);
+    }
+    
+    int size() {
+      return map.size();
+    }
+    
+    void clear() {
+      map.clear();
+    }
+  }
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
     private Source source;
     private BalancerDatanode proxySource;
-    private BalancerDatanode target;
+    private BalancerDatanode.StorageGroup target;
     
     /** constructor */
     private PendingBlockMove() {
@@ -254,7 +294,7 @@ public class Balancer {
       final Block b = block.getBlock();
       return b + " with size=" + b.getNumBytes() + " from "
           + source.getDisplayName() + " to " + target.getDisplayName()
-          + " through " + proxySource.getDisplayName();
+          + " through " + proxySource.datanode;
     }
 
     /* choose a block & a proxy source for this pendingMove 
@@ -306,20 +346,20 @@ public class Balancer {
       final DatanodeInfo targetDN = target.getDatanode();
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
-        for (BalancerDatanode loc : block.getLocations()) {
+        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
           if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
             return true;
           }
         }
       }
       // check if there is replica which is on the same rack with the target
-      for (BalancerDatanode loc : block.getLocations()) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
         if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
           return true;
         }
       }
       // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
+      for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
         if (addTo(loc)) {
           return true;
         }
@@ -327,8 +367,9 @@ public class Balancer {
       return false;
     }
     
-    // add a BalancerDatanode as proxy source for specific block movement
-    private boolean addTo(BalancerDatanode bdn) {
+    /** add to a proxy source for specific block movement */
+    private boolean addTo(BalancerDatanode.StorageGroup g) {
+      final BalancerDatanode bdn = g.getBalancerDatanode();
       if (bdn.addPendingBlock(this)) {
         proxySource = bdn;
         return true;
@@ -344,7 +385,7 @@ public class Balancer {
       DataInputStream in = null;
       try {
         sock.connect(
-            NetUtils.createSocketAddr(target.datanode.getXferAddr()),
+            NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
         /* Unfortunately we don't have a good way to know if the Datanode is
          * taking a really long time to move a block, OR something has
@@ -361,7 +402,7 @@ public class Balancer {
         ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
         Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, nnc, accessToken, target.datanode);
+          unbufIn, nnc, accessToken, target.getDatanode());
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -381,14 +422,14 @@ public class Balancer {
          * gets out of sync with work going on in datanode.
          */
         proxySource.activateDelay(DELAY_AFTER_ERROR);
-        target.activateDelay(DELAY_AFTER_ERROR);
+        target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
         
         proxySource.removePendingBlock(this);
-        target.removePendingBlock(this);
+        target.getBalancerDatanode().removePendingBlock(this);
 
         synchronized (this ) {
           reset();
@@ -404,7 +445,7 @@ public class Balancer {
         StorageType storageType, 
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, storageType, accessToken,
-          source.getStorageID(), proxySource.getDatanode());
+          source.getDatanode().getDatanodeUuid(), proxySource.datanode);
     }
     
     /* Receive a block copy response from the input stream */ 
@@ -444,8 +485,9 @@ public class Balancer {
   /* A class for keeping track of blocks in the Balancer */
   static private class BalancerBlock {
     private final Block block; // the block
-    private final List<BalancerDatanode> locations
-            = new ArrayList<BalancerDatanode>(3); // its locations
+    /** The locations of the replicas of the block. */
+    private final List<BalancerDatanode.StorageGroup> locations
+        = new ArrayList<BalancerDatanode.StorageGroup>(3);
     
     /* Constructor */
     private BalancerBlock(Block block) {
@@ -458,20 +500,19 @@ public class Balancer {
     }
     
     /* add a location */
-    private synchronized void addLocation(BalancerDatanode datanode) {
-      if (!locations.contains(datanode)) {
-        locations.add(datanode);
+    private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
+      if (!locations.contains(g)) {
+        locations.add(g);
       }
     }
     
-    /* Return if the block is located on <code>datanode</code> */
-    private synchronized boolean isLocatedOnDatanode(
-        BalancerDatanode datanode) {
-      return locations.contains(datanode);
+    /** @return if the block is located on the given storage group. */
+    private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
+      return locations.contains(g);
     }
     
     /* Return its locations */
-    private synchronized List<BalancerDatanode> getLocations() {
+    private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
       return locations;
     }
     
@@ -488,37 +529,84 @@ public class Balancer {
   
   /* The class represents a desired move of bytes between two nodes 
    * and the target.
-   * An object of this class is stored in a source node. 
+   * An object of this class is stored in a source. 
    */
-  static private class NodeTask {
-    private final BalancerDatanode datanode; //target node
+  static private class Task {
+    private final BalancerDatanode.StorageGroup target;
     private long size;  //bytes scheduled to move
     
     /* constructor */
-    private NodeTask(BalancerDatanode datanode, long size) {
-      this.datanode = datanode;
+    private Task(BalancerDatanode.StorageGroup target, long size) {
+      this.target = target;
       this.size = size;
     }
-    
-    /* Get the node */
-    private BalancerDatanode getDatanode() {
-      return datanode;
-    }
-    
-    /* Get the number of bytes that need to be moved */
-    private long getSize() {
-      return size;
-    }
   }
   
   
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode {
-    final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+    
+    /** A group of storages in a datanode with the same storage type. */
+    private class StorageGroup {
+      final StorageType storageType;
+      final double utilization;
+      final long maxSize2Move;
+      private long scheduledSize = 0L;
+
+      private StorageGroup(StorageType storageType, double utilization,
+          long maxSize2Move) {
+        this.storageType = storageType;
+        this.utilization = utilization;
+        this.maxSize2Move = maxSize2Move;
+      }
+      
+      BalancerDatanode getBalancerDatanode() {
+        return BalancerDatanode.this;
+      }
+
+      DatanodeInfo getDatanode() {
+        return BalancerDatanode.this.datanode;
+      }
+
+      /** Decide if still need to move more bytes */
+      protected synchronized boolean hasSpaceForScheduling() {
+        return availableSizeToMove() > 0L;
+      }
+
+      /** @return the total number of bytes that need to be moved */
+      synchronized long availableSizeToMove() {
+        return maxSize2Move - scheduledSize;
+      }
+      
+      /** increment scheduled size */
+      synchronized void incScheduledSize(long size) {
+        scheduledSize += size;
+      }
+      
+      /** @return scheduled size */
+      synchronized long getScheduledSize() {
+        return scheduledSize;
+      }
+      
+      /** Reset scheduled size to zero. */
+      synchronized void resetScheduledSize() {
+        scheduledSize = 0L;
+      }
+
+      /** @return the name for display */
+      String getDisplayName() {
+        return datanode + ":" + storageType;
+      }
+      
+      @Override
+      public String toString() {
+        return "" + utilization;
+      }
+    }
+
     final DatanodeInfo datanode;
-    final double utilization;
-    final long maxSize2Move;
-    private long scheduledSize = 0L;
+    final EnumMap<StorageType, StorageGroup> storageMap
+        = new EnumMap<StorageType, StorageGroup>(StorageType.class);
     protected long delayUntil = 0L;
     //  blocks being moved but not confirmed yet
     private final List<PendingBlockMove> pendingBlocks;
@@ -526,78 +614,38 @@ public class Balancer {
     
     @Override
     public String toString() {
-      return getClass().getSimpleName() + "[" + datanode
-          + ", utilization=" + utilization + "]";
+      return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
     }
 
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
-        int maxConcurrentMoves) {
-      datanode = node;
-      utilization = policy.getUtilization(node);
-      final double avgUtil = policy.getAvgUtilization();
-      long maxSizeToMove;
-
-      if (utilization >= avgUtil+threshold
-          || utilization <= avgUtil-threshold) { 
-        maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
-      } else {
-        maxSizeToMove = 
-          (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
-      }
-      if (utilization < avgUtil ) {
-        maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
-      }
-      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+    private BalancerDatanode(DatanodeStorageReport report,
+        double threshold, int maxConcurrentMoves) {
+      this.datanode = report.getDatanodeInfo();
       this.maxConcurrentMoves = maxConcurrentMoves;
       this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
     }
     
-    /** Get the datanode */
-    protected DatanodeInfo getDatanode() {
-      return datanode;
-    }
-    
-    /** Get the name of the datanode */
-    protected String getDisplayName() {
-      return datanode.toString();
-    }
-    
-    /* Get the storage id of the datanode */
-    protected String getStorageID() {
-      return datanode.getDatanodeUuid();
-    }
-    
-    /** Decide if still need to move more bytes */
-    protected synchronized boolean hasSpaceForScheduling() {
-      return scheduledSize<maxSize2Move;
-    }
-
-    /** Return the total number of bytes that need to be moved */
-    protected synchronized long availableSizeToMove() {
-      return maxSize2Move-scheduledSize;
-    }
-    
-    /** increment scheduled size */
-    protected synchronized void incScheduledSize(long size) {
-      scheduledSize += size;
-    }
-    
-    /** decrement scheduled size */
-    protected synchronized void decScheduledSize(long size) {
-      scheduledSize -= size;
-    }
-    
-    /** get scheduled size */
-    protected synchronized long getScheduledSize(){
-      return scheduledSize;
-    }
-    
-    /** get scheduled size */
-    protected synchronized void setScheduledSize(long size){
-      scheduledSize = size;
+    private void put(StorageType storageType, StorageGroup g) {
+      final StorageGroup existing = storageMap.put(storageType, g);
+      Preconditions.checkState(existing == null);
+    }
+
+    StorageGroup addStorageGroup(StorageType storageType, double utilization,
+        long maxSize2Move) {
+      final StorageGroup g = new StorageGroup(storageType, utilization,
+          maxSize2Move);
+      put(storageType, g);
+      return g;
+    }
+
+    Source addSource(StorageType storageType, double utilization,
+        long maxSize2Move, Balancer balancer) {
+      final Source s = balancer.new Source(storageType, utilization,
+          maxSize2Move, this);
+      put(storageType, s);
+      return s;
     }
 
     synchronized private void activateDelay(long delta) {
@@ -640,9 +688,9 @@ public class Balancer {
       return pendingBlocks.remove(pendingBlock);
     }
   }
-  
+
   /** A node that can be the sources of a block move */
-  private class Source extends BalancerDatanode {
+  private class Source extends BalancerDatanode.StorageGroup {
     
     /* A thread that initiates a block move 
      * and waits for block move to complete */
@@ -653,7 +701,7 @@ public class Balancer {
       }
     }
     
-    private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
+    private final List<Task> tasks = new ArrayList<Task>(2);
     private long blocksToReceive = 0L;
     /* source blocks point to balancerBlocks in the global list because
      * we want to keep one copy of a block in balancer and be aware that
@@ -663,17 +711,17 @@ public class Balancer {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
-        int maxConcurrentMoves) {
-      super(node, policy, threshold, maxConcurrentMoves);
+    private Source(StorageType storageType, double utilization,
+        long maxSize2Move, BalancerDatanode dn) {
+      dn.super(storageType, utilization, maxSize2Move);
     }
     
-    /** Add a node task */
-    private void addNodeTask(NodeTask task) {
-      assert (task.datanode != this) :
-        "Source and target are the same " + datanode;
-      incScheduledSize(task.getSize());
-      nodeTasks.add(task);
+    /** Add a task */
+    private void addTask(Task task) {
+      Preconditions.checkState(task.target != this,
+          "Source and target are the same storage group " + getDisplayName());
+      incScheduledSize(task.size);
+      tasks.add(task);
     }
     
     /* Return an iterator to this source's blocks */
@@ -686,8 +734,10 @@ public class Balancer {
      * Return the total size of the received blocks in the number of bytes.
      */
     private long getBlockList() throws IOException {
-      BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, 
-        Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+      final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+      final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
+          getDatanode(), size).getBlocks();
+
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks) {
         bytesReceived += blk.getBlock().getNumBytes();
@@ -703,10 +753,13 @@ public class Balancer {
         
           synchronized (block) {
             // update locations
-            for (String datanodeUuid : blk.getDatanodeUuids()) {
-              final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (d != null) { // not an unknown datanode
-                block.addLocation(d);
+            final String[] datanodeUuids = blk.getDatanodeUuids();
+            final StorageType[] storageTypes = blk.getStorageTypes();
+            for (int i = 0; i < datanodeUuids.length; i++) {
+              final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+                  datanodeUuids[i], storageTypes[i]);
+              if (g != null) { // not unknown
+                block.addLocation(g);
               }
             }
           }
@@ -721,8 +774,8 @@ public class Balancer {
 
     /* Decide if the given block is a good candidate to move or not */
     private boolean isGoodBlockCandidate(BalancerBlock block) {
-      for (NodeTask nodeTask : nodeTasks) {
-        if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+      for (Task t : tasks) {
+        if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) {
           return true;
         }
       }
@@ -737,20 +790,20 @@ public class Balancer {
      * The block should be dispatched immediately after this method is returned.
      */
     private PendingBlockMove chooseNextBlockToMove() {
-      for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
-        NodeTask task = tasks.next();
-        BalancerDatanode target = task.getDatanode();
+      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
+        final Task task = i.next();
+        final BalancerDatanode target = task.target.getBalancerDatanode();
         PendingBlockMove pendingBlock = new PendingBlockMove();
         if (target.addPendingBlock(pendingBlock)) { 
           // target is not busy, so do a tentative block allocation
           pendingBlock.source = this;
-          pendingBlock.target = target;
+          pendingBlock.target = task.target;
           if ( pendingBlock.chooseBlockAndProxy() ) {
             long blockSize = pendingBlock.block.getNumBytes();
-            decScheduledSize(blockSize);
+            incScheduledSize(-blockSize);
             task.size -= blockSize;
             if (task.size == 0) {
-              tasks.remove();
+              i.remove();
             }
             return pendingBlock;
           } else {
@@ -824,7 +877,7 @@ public class Balancer {
           // in case no blocks can be moved for source node's task,
           // jump out of while-loop after 5 iterations.
           if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
-            setScheduledSize(0);
+            resetScheduledSize();
           }
         }
         
@@ -869,6 +922,8 @@ public class Balancer {
   Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
     this.threshold = p.threshold;
     this.policy = p.policy;
+    this.nodesToBeExcluded = p.nodesToBeExcluded;
+    this.nodesToBeIncluded = p.nodesToBeIncluded;
     this.nnc = theblockpool;
     cluster = NetworkTopology.getInstance(conf);
 
@@ -889,95 +944,154 @@ public class Balancer {
         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
-  /* Given a data node set, build a network topology and decide
-   * over-utilized datanodes, above average utilized datanodes, 
-   * below average utilized datanodes, and underutilized datanodes. 
-   * The input data node set is shuffled before the datanodes 
-   * are put into the over-utilized datanodes, above average utilized
-   * datanodes, below average utilized datanodes, and
-   * underutilized datanodes lists. This will add some randomness
-   * to the node matching later on.
-   * 
+  
+  private static long getCapacity(DatanodeStorageReport report, StorageType t) {
+    long capacity = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        capacity += r.getCapacity();
+      }
+    }
+    return capacity;
+  }
+
+  private static long getRemaining(DatanodeStorageReport report, StorageType t) {
+    long remaining = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        remaining += r.getRemaining();
+      }
+    }
+    return remaining;
+  }
+
+  private boolean shouldIgnore(DatanodeInfo dn) {
+    //ignore decommissioned nodes
+    final boolean decommissioned = dn.isDecommissioned();
+    //ignore decommissioning nodes
+    final boolean decommissioning = dn.isDecommissionInProgress();
+    // ignore nodes in exclude list
+    final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn);
+    // ignore nodes not in the include list (if include list is not empty)
+    final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn);
+    
+    if (decommissioned || decommissioning || excluded || notIncluded) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+            + decommissioning + ", " + excluded + ", " + notIncluded);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Given a datanode storage set, build a network topology and decide
+   * over-utilized storages, above average utilized storages, 
+   * below average utilized storages, and underutilized storages. 
+   * The input datanode storage set is shuffled in order to randomize
+   * to the storage matching later on.
+   *
    * @return the total number of bytes that are 
    *                needed to move to make the cluster balanced.
-   * @param datanodes a set of datanodes
+   * @param reports a set of datanode storage reports
    */
-  private long initNodes(DatanodeInfo[] datanodes) {
+  private long init(DatanodeStorageReport[] reports) {
     // compute average utilization
-    for (DatanodeInfo datanode : datanodes) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
+    for (DatanodeStorageReport r : reports) {
+      if (shouldIgnore(r.getDatanodeInfo())) {
+        continue; 
       }
-      policy.accumulateSpaces(datanode);
+      policy.accumulateSpaces(r);
     }
     policy.initAvgUtilization();
 
-    /*create network topology and all data node lists: 
-     * overloaded, above-average, below-average, and underloaded
-     * we alternates the accessing of the given datanodes array either by
-     * an increasing order or a decreasing order.
-     */  
+    // create network topology and classify utilization collections: 
+    //   over-utilized, above-average, below-average and under-utilized.
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+    for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
+      final DatanodeInfo datanode = r.getDatanodeInfo();
+      if (shouldIgnore(datanode)) {
         continue; // ignore decommissioning or decommissioned nodes
       }
       cluster.add(datanode);
-      BalancerDatanode datanodeS;
-      final double avg = policy.getAvgUtilization();
-      if (policy.getUtilization(datanode) >= avg) {
-        datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
-        if (isAboveAvgUtilized(datanodeS)) {
-          this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
-        } else {
-          assert(isOverUtilized(datanodeS)) :
-            datanodeS.getDisplayName()+ "is not an overUtilized node";
-          this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avg
-              -threshold)*datanodeS.datanode.getCapacity()/100.0);
+
+      final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes,
+          maxConcurrentMovesPerNode);
+      for(StorageType t : StorageType.asList()) {
+        final Double utilization = policy.getUtilization(r, t);
+        if (utilization == null) { // datanode does not have such storage type 
+          continue;
         }
-      } else {
-        datanodeS = new BalancerDatanode(datanode, policy, threshold,
-            maxConcurrentMovesPerNode);
-        if ( isBelowOrEqualAvgUtilized(datanodeS)) {
-          this.belowAvgUtilizedDatanodes.add(datanodeS);
+        
+        final long capacity = getCapacity(r, t);
+        final double utilizationDiff = utilization - policy.getAvgUtilization(t);
+        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
+        final long maxSize2Move = computeMaxSize2Move(capacity,
+            getRemaining(r, t), utilizationDiff, threshold);
+
+        final BalancerDatanode.StorageGroup g;
+        if (utilizationDiff > 0) {
+          final Source s = dn.addSource(t, utilization, maxSize2Move, this);
+          if (thresholdDiff <= 0) { // within threshold
+            aboveAvgUtilized.add(s);
+          } else {
+            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overUtilized.add(s);
+          }
+          g = s;
         } else {
-          assert isUnderUtilized(datanodeS) : "isUnderUtilized("
-              + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
-              + ", utilization=" + datanodeS.utilization; 
-          this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avg-threshold-
-              datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+          g = dn.addStorageGroup(t, utilization, maxSize2Move);
+          if (thresholdDiff <= 0) { // within threshold
+            belowAvgUtilized.add(g);
+          } else {
+            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underUtilized.add(g);
+          }
         }
+        storageGroupMap.put(g);
       }
-      datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
     }
 
-    //logging
-    logNodes();
+    logUtilizationCollections();
     
-    assert (this.datanodeMap.size() == 
-      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
-      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
-      : "Mismatched number of datanodes";
+    Preconditions.checkState(storageGroupMap.size() == overUtilized.size()
+        + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(),
+        "Mismatched number of storage groups");
     
     // return number of bytes to be moved in order to make the cluster balanced
     return Math.max(overLoadedBytes, underLoadedBytes);
   }
 
+  private static long computeMaxSize2Move(final long capacity, final long remaining,
+      final double utilizationDiff, final double threshold) {
+    final double diff = Math.min(threshold, Math.abs(utilizationDiff));
+    long maxSizeToMove = precentage2bytes(diff, capacity);
+    if (utilizationDiff < 0) {
+      maxSizeToMove = Math.min(remaining, maxSizeToMove);
+    }
+    return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+  }
+
+  private static long precentage2bytes(double precentage, long capacity) {
+    Preconditions.checkArgument(precentage >= 0,
+        "precentage = " + precentage + " < 0");
+    return (long)(precentage * capacity / 100.0);
+  }
+
   /* log the over utilized & under utilized nodes */
-  private void logNodes() {
-    logNodes("over-utilized", overUtilizedDatanodes);
+  private void logUtilizationCollections() {
+    logUtilizationCollection("over-utilized", overUtilized);
     if (LOG.isTraceEnabled()) {
-      logNodes("above-average", aboveAvgUtilizedDatanodes);
-      logNodes("below-average", belowAvgUtilizedDatanodes);
+      logUtilizationCollection("above-average", aboveAvgUtilized);
+      logUtilizationCollection("below-average", belowAvgUtilized);
     }
-    logNodes("underutilized", underUtilizedDatanodes);
+    logUtilizationCollection("underutilized", underUtilized);
   }
 
-  private static <T extends BalancerDatanode> void logNodes(
-      String name, Collection<T> nodes) {
-    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  private static <T extends BalancerDatanode.StorageGroup>
+      void logUtilizationCollection(String name, Collection<T> items) {
+    LOG.info(items.size() + " " + name + ": " + items);
   }
 
   /** A matcher interface for matching nodes. */
@@ -1013,26 +1127,24 @@ public class Balancer {
   /**
    * Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
-   * Maximum bytes to be moved per node is
-   * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
-   * Return total number of bytes to move in this iteration
+   * Maximum bytes to be moved per storage group is
+   * min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
+   * @return total number of bytes to move in this iteration
    */
-  private long chooseNodes() {
+  private long chooseStorageGroups() {
     // First, match nodes on the same node group if cluster is node group aware
     if (cluster.isNodeGroupAware()) {
-      chooseNodes(SAME_NODE_GROUP);
+      chooseStorageGroups(SAME_NODE_GROUP);
     }
     
     // Then, match nodes on the same rack
-    chooseNodes(SAME_RACK);
+    chooseStorageGroups(SAME_RACK);
     // At last, match all remaining nodes
-    chooseNodes(ANY_OTHER);
+    chooseStorageGroups(ANY_OTHER);
     
-    assert (datanodeMap.size() >= sources.size()+targets.size())
-      : "Mismatched number of datanodes (" +
-      datanodeMap.size() + " total, " +
-      sources.size() + " sources, " +
-      targets.size() + " targets)";
+    Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
+        "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
+            + sources.size() + " sources, " + targets.size() + " targets)");
 
     long bytesToMove = 0L;
     for (Source src : sources) {
@@ -1042,25 +1154,25 @@ public class Balancer {
   }
 
   /** Decide all <source, target> pairs according to the matcher. */
-  private void chooseNodes(final Matcher matcher) {
+  private void chooseStorageGroups(final Matcher matcher) {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, underUtilized, matcher);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
 
     /* match each remaining underutilized datanode (target) to 
      * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
+    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
   }
 
   /**
@@ -1068,13 +1180,14 @@ public class Balancer {
    * datanodes or the candidates are source nodes with (utilization > Avg), and
    * the others are target nodes with (utilization < Avg).
    */
-  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
-      chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+  private <G extends BalancerDatanode.StorageGroup,
+           C extends BalancerDatanode.StorageGroup>
+      void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
           Matcher matcher) {
-    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
-      final D datanode = i.next();
-      for(; chooseForOneDatanode(datanode, candidates, matcher); );
-      if (!datanode.hasSpaceForScheduling()) {
+    for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
+      final G g = i.next();
+      for(; choose4One(g, candidates, matcher); );
+      if (!g.hasSpaceForScheduling()) {
         i.remove();
       }
     }
@@ -1084,18 +1197,19 @@ public class Balancer {
    * For the given datanode, choose a candidate and then schedule it.
    * @return true if a candidate is chosen; false if no candidates is chosen.
    */
-  private <C extends BalancerDatanode> boolean chooseForOneDatanode(
-      BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+  private <C extends BalancerDatanode.StorageGroup>
+      boolean choose4One(BalancerDatanode.StorageGroup g,
+          Collection<C> candidates, Matcher matcher) {
     final Iterator<C> i = candidates.iterator();
-    final C chosen = chooseCandidate(dn, i, matcher);
-
+    final C chosen = chooseCandidate(g, i, matcher);
+  
     if (chosen == null) {
       return false;
     }
-    if (dn instanceof Source) {
-      matchSourceWithTargetToMove((Source)dn, chosen);
+    if (g instanceof Source) {
+      matchSourceWithTargetToMove((Source)g, chosen);
     } else {
-      matchSourceWithTargetToMove((Source)chosen, dn);
+      matchSourceWithTargetToMove((Source)chosen, g);
     }
     if (!chosen.hasSpaceForScheduling()) {
       i.remove();
@@ -1103,27 +1217,28 @@ public class Balancer {
     return true;
   }
   
-  private void matchSourceWithTargetToMove(
-      Source source, BalancerDatanode target) {
+  private void matchSourceWithTargetToMove(Source source,
+      BalancerDatanode.StorageGroup target) {
     long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
-    NodeTask nodeTask = new NodeTask(target, size);
-    source.addNodeTask(nodeTask);
-    target.incScheduledSize(nodeTask.getSize());
+    final Task task = new Task(target, size);
+    source.addTask(task);
+    target.incScheduledSize(task.size);
     sources.add(source);
     targets.add(target);
     LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-        +source.datanode.getName() + " to " + target.datanode.getName());
+        + source.getDisplayName() + " to " + target.getDisplayName());
   }
   
   /** Choose a candidate for the given datanode. */
-  private <D extends BalancerDatanode, C extends BalancerDatanode>
-      C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
-    if (dn.hasSpaceForScheduling()) {
+  private <G extends BalancerDatanode.StorageGroup,
+           C extends BalancerDatanode.StorageGroup>
+      C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
+    if (g.hasSpaceForScheduling()) {
       for(; candidates.hasNext(); ) {
         final C c = candidates.next();
         if (!c.hasSpaceForScheduling()) {
           candidates.remove();
-        } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+        } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) {
           return c;
         }
       }
@@ -1177,9 +1292,10 @@ public class Balancer {
     boolean shouldWait;
     do {
       shouldWait = false;
-      for (BalancerDatanode target : targets) {
-        if (!target.isPendingQEmpty()) {
+      for (BalancerDatanode.StorageGroup target : targets) {
+        if (!target.getBalancerDatanode().isPendingQEmpty()) {
           shouldWait = true;
+          break;
         }
       }
       if (shouldWait) {
@@ -1248,12 +1364,15 @@ public class Balancer {
    * 3. doing the move does not reduce the number of racks that the block has
    */
   private boolean isGoodBlockCandidate(Source source, 
-      BalancerDatanode target, BalancerBlock block) {
+      BalancerDatanode.StorageGroup target, BalancerBlock block) {
+    if (source.storageType != target.storageType) {
+      return false;
+    }
     // check if the block is moved or not
     if (movedBlocks.contains(block)) {
-        return false;
+      return false;
     }
-    if (block.isLocatedOnDatanode(target)) {
+    if (block.isLocatedOn(target)) {
       return false;
     }
     if (cluster.isNodeGroupAware() && 
@@ -1268,8 +1387,8 @@ public class Balancer {
     } else {
       boolean notOnSameRack = true;
       synchronized (block) {
-        for (BalancerDatanode loc : block.locations) {
-          if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
+        for (BalancerDatanode.StorageGroup loc : block.locations) {
+          if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
             notOnSameRack = false;
             break;
           }
@@ -1280,9 +1399,9 @@ public class Balancer {
         goodBlock = true;
       } else {
         // good if source is on the same rack as on of the replicas
-        for (BalancerDatanode loc : block.locations) {
+        for (BalancerDatanode.StorageGroup loc : block.locations) {
           if (loc != source && 
-              cluster.isOnSameRack(loc.datanode, source.datanode)) {
+              cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
             goodBlock = true;
             break;
           }
@@ -1303,25 +1422,26 @@ public class Balancer {
    * @return true if there are any replica (other than source) on the same node
    * group with target
    */
-  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
       BalancerBlock block, Source source) {
-    for (BalancerDatanode loc : block.locations) {
+    final DatanodeInfo targetDn = target.getDatanode();
+    for (BalancerDatanode.StorageGroup loc : block.locations) {
       if (loc != source && 
-        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
-          return true;
-        }
+          cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
+        return true;
       }
+    }
     return false;
   }
 
   /* reset all fields in a balancer preparing for the next iteration */
   private void resetData(Configuration conf) {
     this.cluster = NetworkTopology.getInstance(conf);
-    this.overUtilizedDatanodes.clear();
-    this.aboveAvgUtilizedDatanodes.clear();
-    this.belowAvgUtilizedDatanodes.clear();
-    this.underUtilizedDatanodes.clear();
-    this.datanodeMap.clear();
+    this.overUtilized.clear();
+    this.aboveAvgUtilized.clear();
+    this.belowAvgUtilized.clear();
+    this.underUtilized.clear();
+    this.storageGroupMap.clear();
     this.sources.clear();
     this.targets.clear();  
     this.policy.reset();
@@ -1341,32 +1461,6 @@ public class Balancer {
       }
     }
   }
-  
-  /* Return true if the given datanode is overUtilized */
-  private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (policy.getAvgUtilization()+threshold);
-  }
-  
-  /* Return true if the given datanode is above or equal to average utilized
-   * but not overUtilized */
-  private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization <= (avg+threshold))
-        && (datanode.utilization >= avg);
-  }
-  
-  /* Return true if the given datanode is underUtilized */
-  private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (policy.getAvgUtilization()-threshold);
-  }
-
-  /* Return true if the given datanode is below average utilized 
-   * but not underUtilized */
-  private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
-    final double avg = policy.getAvgUtilization();
-    return (datanode.utilization >= (avg-threshold))
-             && (datanode.utilization <= avg);
-  }
 
   // Exit status
   enum ReturnStatus {
@@ -1394,7 +1488,8 @@ public class Balancer {
       /* get all live datanodes of a cluster and their disk usage
        * decide the number of bytes need to be moved
        */
-      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+      final long bytesLeftToMove = init(
+          nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
       if (bytesLeftToMove == 0) {
         System.out.println("The cluster is balanced. Exiting...");
         return ReturnStatus.SUCCESS;
@@ -1408,7 +1503,7 @@ public class Balancer {
        * in this iteration. Maximum bytes to be moved per node is
        * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
        */
-      final long bytesToMove = chooseNodes();
+      final long bytesToMove = chooseStorageGroups();
       if (bytesToMove == 0) {
         System.out.println("No block can be moved. Exiting...");
         return ReturnStatus.NO_MOVE_BLOCK;
@@ -1526,21 +1621,101 @@ public class Balancer {
   }
 
   static class Parameters {
-    static final Parameters DEFALUT = new Parameters(
-        BalancingPolicy.Node.INSTANCE, 10.0);
+    static final Parameters DEFAULT = new Parameters(
+        BalancingPolicy.Node.INSTANCE, 10.0,
+        Collections.<String> emptySet(), Collections.<String> emptySet());
 
     final BalancingPolicy policy;
     final double threshold;
+    // exclude the nodes in this set from balancing operations
+    Set<String> nodesToBeExcluded;
+    //include only these nodes in balancing operations
+    Set<String> nodesToBeIncluded;
 
-    Parameters(BalancingPolicy policy, double threshold) {
+    Parameters(BalancingPolicy policy, double threshold,
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
       this.policy = policy;
       this.threshold = threshold;
+      this.nodesToBeExcluded = nodesToBeExcluded;
+      this.nodesToBeIncluded = nodesToBeIncluded;
     }
 
     @Override
     public String toString() {
       return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold + "]";
+          + "[" + policy + ", threshold=" + threshold +
+          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+    }
+  }
+
+  static class Util {
+
+    /**
+     * @param datanode
+     * @return returns true if data node is part of the excludedNodes.
+     */
+    static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
+      return isIn(excludedNodes, datanode);
+    }
+
+    /**
+     * @param datanode
+     * @return returns true if includedNodes is empty or data node is part of the includedNodes.
+     */
+    static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
+      return (includedNodes.isEmpty() ||
+          isIn(includedNodes, datanode));
+    }
+    /**
+     * Match is checked using host name , ip address with and without port number.
+     * @param datanodeSet
+     * @param datanode
+     * @return true if the datanode's transfer address matches the set of nodes.
+     */
+    private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
+      return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
+          isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
+          isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
+    }
+
+    /**
+     * returns true if nodes contains host or host:port
+     * @param nodes
+     * @param host
+     * @param port
+     * @return
+     */
+    private static boolean isIn(Set<String> nodes, String host, int port) {
+      if (host == null) {
+        return false;
+      }
+      return (nodes.contains(host) || nodes.contains(host +":"+ port));
+    }
+
+    /**
+     * parse a comma separated string to obtain set of host names
+     * @param string
+     * @return
+     */
+    static Set<String> parseHostList(String string) {
+      String[] addrs = StringUtils.getTrimmedStrings(string);
+      return new HashSet<String>(Arrays.asList(addrs));
+    }
+
+    /**
+     * read set of host names from a file
+     * @param fileName
+     * @return
+     */
+    static Set<String> getHostListFromFile(String fileName) {
+      Set<String> nodes = new HashSet <String> ();
+      try {
+        HostsFileReader.readFileToSet("nodes", fileName, nodes);
+        return StringUtils.getTrimmedStrings(nodes);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Unable to open file: " + fileName);
+      }
     }
   }
 
@@ -1578,8 +1753,10 @@ public class Balancer {
 
     /** parse command line arguments */
     static Parameters parse(String[] args) {
-      BalancingPolicy policy = Parameters.DEFALUT.policy;
-      double threshold = Parameters.DEFALUT.threshold;
+      BalancingPolicy policy = Parameters.DEFAULT.policy;
+      double threshold = Parameters.DEFAULT.threshold;
+      Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+      Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
 
       if (args != null) {
         try {
@@ -1608,18 +1785,38 @@ public class Balancer {
                 System.err.println("Illegal policy name: " + args[i]);
                 throw e;
               }
+            } else if ("-exclude".equalsIgnoreCase(args[i])) {
+              i++;
+              if ("-f".equalsIgnoreCase(args[i])) {
+                nodesTobeExcluded = Util.getHostListFromFile(args[++i]);
+              } else {
+                nodesTobeExcluded = Util.parseHostList(args[i]);
+              }
+            } else if ("-include".equalsIgnoreCase(args[i])) {
+              i++;
+              if ("-f".equalsIgnoreCase(args[i])) {
+                nodesTobeIncluded = Util.getHostListFromFile(args[++i]);
+               } else {
+                nodesTobeIncluded = Util.parseHostList(args[i]);
+              }
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
             }
           }
+          if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) {
+            System.err.println(
+                "-exclude and -include options cannot be specified together.");
+            throw new IllegalArgumentException(
+                "-exclude and -include options cannot be specified together.");
+          }
         } catch(RuntimeException e) {
           printUsage(System.err);
           throw e;
         }
       }
       
-      return new Parameters(policy, threshold);
+      return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
     }
 
     private static void printUsage(PrintStream out) {

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Fri Aug  1 01:29:49 2014
@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
 
 /**
  * Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D
  */
 @InterfaceAudience.Private
 abstract class BalancingPolicy {
-  long totalCapacity;
-  long totalUsedSpace;
-  private double avgUtilization;
+  final EnumCounters<StorageType> totalCapacities
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumCounters<StorageType> totalUsedSpaces
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumDoubles<StorageType> avgUtilizations
+      = new EnumDoubles<StorageType>(StorageType.class);
 
   void reset() {
-    totalCapacity = 0L;
-    totalUsedSpace = 0L;
-    avgUtilization = 0.0;
+    totalCapacities.reset();
+    totalUsedSpaces.reset();
+    avgUtilizations.reset();
   }
 
   /** Get the policy name. */
   abstract String getName();
 
   /** Accumulate used space and capacity. */
-  abstract void accumulateSpaces(DatanodeInfo d);
+  abstract void accumulateSpaces(DatanodeStorageReport r);
 
   void initAvgUtilization() {
-    this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+    for(StorageType t : StorageType.asList()) {
+      final long capacity = totalCapacities.get(t);
+      if (capacity > 0L) {
+        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
+        avgUtilizations.set(t, avg);
+      }
+    }
   }
-  double getAvgUtilization() {
-    return avgUtilization;
+
+  double getAvgUtilization(StorageType t) {
+    return avgUtilizations.get(t);
   }
 
-  /** Return the utilization of a datanode */
-  abstract double getUtilization(DatanodeInfo d);
+  /** @return the utilization of a particular storage type of a datanode;
+   *          or return null if the datanode does not have such storage type.
+   */
+  abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
   
   @Override
   public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getDfsUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getDfsUsed());
+      }
     }
     
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getDfsUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long dfsUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          dfsUsed += s.getDfsUsed();
+        }
+      }
+      return capacity == 0L? null: dfsUsed*100.0/capacity;
     }
   }
 
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getBlockPoolUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getBlockPoolUsed());
+      }
     }
 
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getBlockPoolUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long blockPoolUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          blockPoolUsed += s.getBlockPoolUsed();
+        }
+      }
+      return capacity == 0L? null: blockPoolUsed*100.0/capacity;
     }
   }
 }