You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/08/01 03:29:54 UTC
svn commit: r1615020 [1/2] - in
/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/fs/
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/...
Author: szetszwo
Date: Fri Aug 1 01:29:49 2014
New Revision: 1615020
URL: http://svn.apache.org/r1615020
Log:
Merge r1609845 through r1615019 from trunk.
Added:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java
- copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/FsActionParam.java
- copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/FsActionParam.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithXAttrs.java
- copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithXAttrs.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsWithXAttrs.java
- copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsWithXAttrs.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/XmlEditsVisitor.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsUrl.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermissionSymlinks.java
Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1614232-1615019
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 1 01:29:49 2014
@@ -204,9 +204,6 @@ Trunk (Unreleased)
HDFS-3549. Fix dist tar build fails in hadoop-hdfs-raid project. (Jason Lowe via daryn)
- HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException
- if option is specified without values. ( Madhukara Phatak via umamahesh)
-
HDFS-3614. Revert unused MiniDFSCluster constructor from HDFS-3049.
(acmurthy via eli)
@@ -346,6 +343,20 @@ Release 2.6.0 - UNRELEASED
HDFS-6739. Add getDatanodeStorageReport to ClientProtocol. (szetszwo)
+ HDFS-6665. Add tests for XAttrs in combination with viewfs.
+ (Stephen Chu via wang)
+
+ HDFS-6778. The extended attributes javadoc should simply refer to the
+ user docs. (clamb via wang)
+
+ HDFS-6570. add api that enables checking if a user has certain permissions on
+ a file. (Jitendra Pandey via cnauroth)
+
+ HDFS-6441. Add ability to exclude/include specific datanodes while
+ balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
+
+ HDFS-6685. Balancer should preserve storage type of replicas. (szetszwo)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -408,6 +419,16 @@ Release 2.6.0 - UNRELEASED
HDFS-6749. FSNamesystem methods should call resolvePath.
(Charles Lamb via cnauroth)
+ HDFS-4629. Using com.sun.org.apache.xml.internal.serialize.* in
+ XmlEditsVisitor.java is JVM vendor specific. Breaks IBM JAVA.
+ (Amir Sanjar via stevel)
+
+ HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException
+ if option is specified without values. ( Madhukara Phatak via umamahesh)
+
+ HDFS-6797. DataNode logs wrong layoutversion during upgrade. (Benoy Antony
+ via Arpit Agarwal)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -963,6 +984,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6717. JIRA HDFS-5804 breaks default nfs-gateway behavior for unsecured config
(brandonli)
+ HDFS-6768. Fix a few unit tests that use hard-coded port numbers. (Arpit
+ Agarwal)
+
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml Fri Aug 1 01:29:49 2014
@@ -176,6 +176,11 @@ http://maven.apache.org/xsd/maven-4.0.0.
<artifactId>netty</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1614232-1615019
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Fri Aug 1 01:29:49 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
@@ -448,6 +449,11 @@ public class Hdfs extends AbstractFileSy
dfs.removeXAttr(getUriPath(path), name);
}
+ @Override
+ public void access(Path path, final FsAction mode) throws IOException {
+ dfs.checkAccess(getUriPath(path), mode);
+ }
+
/**
* Renew an existing delegation token.
*
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Aug 1 01:29:49 2014
@@ -122,6 +122,7 @@ import org.apache.hadoop.fs.XAttrSetFlag
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.net.Peer;
@@ -2832,6 +2833,17 @@ public class DFSClient implements java.i
}
}
+ public void checkAccess(String src, FsAction mode) throws IOException {
+ checkOpen();
+ try {
+ namenode.checkAccess(src, mode);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ UnresolvedPathException.class);
+ }
+ }
+
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Aug 1 01:29:49 2014
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -1898,4 +1899,23 @@ public class DistributedFileSystem exten
}
}.resolve(this, absF);
}
+
+ @Override
+ public void access(Path path, final FsAction mode) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.checkAccess(getPathName(p), mode);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.access(p, mode);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Fri Aug 1 01:29:49 2014
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -35,4 +38,10 @@ public enum StorageType {
public static final StorageType DEFAULT = DISK;
public static final StorageType[] EMPTY_ARRAY = {};
-}
\ No newline at end of file
+
+ private static final StorageType[] VALUES = values();
+
+ public static List<StorageType> asList() {
+ return Arrays.asList(VALUES);
+ }
+}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Aug 1 01:29:49 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -1267,17 +1268,11 @@ public interface ClientProtocol {
/**
* Set xattr of a file or directory.
- * A regular user only can set xattr of "user" namespace.
- * A super user can set xattr of "user" and "trusted" namespace.
- * XAttr of "security" and "system" namespace is only used/exposed
- * internally to the FS impl.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
* <p/>
- * For xattr of "user" namespace, its access permissions are
- * defined by the file or directory permission bits.
- * XAttr will be set only when login user has correct permissions.
- * <p/>
- * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
- * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
* @param src file or directory
* @param xAttr <code>XAttr</code> to set
* @param flag set flag
@@ -1288,18 +1283,13 @@ public interface ClientProtocol {
throws IOException;
/**
- * Get xattrs of file or directory. Values in xAttrs parameter are ignored.
- * If xattrs is null or empty, equals getting all xattrs of the file or
- * directory.
- * Only xattrs which login user has correct permissions will be returned.
- * <p/>
- * A regular user only can get xattr of "user" namespace.
- * A super user can get xattr of "user" and "trusted" namespace.
- * XAttr of "security" and "system" namespace is only used/exposed
- * internally to the FS impl.
+ * Get xattrs of a file or directory. Values in xAttrs parameter are ignored.
+ * If xAttrs is null or empty, this is the same as getting all xattrs of the
+ * file or directory. Only those xattrs for which the logged-in user has
+ * permissions to view are returned.
* <p/>
- * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
- * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
* @param src file or directory
* @param xAttrs xAttrs to get
* @return List<XAttr> <code>XAttr</code> list
@@ -1314,13 +1304,8 @@ public interface ClientProtocol {
* Only the xattr names for which the logged in user has the permissions to
* access will be returned.
* <p/>
- * A regular user only can get xattr names from the "user" namespace.
- * A super user can get xattr names of the "user" and "trusted" namespace.
- * XAttr names of the "security" and "system" namespaces are only used/exposed
- * internally by the file system impl.
- * <p/>
- * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
- * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
* @param src file or directory
* @param xAttrs xAttrs to get
* @return List<XAttr> <code>XAttr</code> list
@@ -1332,19 +1317,33 @@ public interface ClientProtocol {
/**
* Remove xattr of a file or directory.Value in xAttr parameter is ignored.
- * Name must be prefixed with user/trusted/security/system.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
* <p/>
- * A regular user only can remove xattr of "user" namespace.
- * A super user can remove xattr of "user" and "trusted" namespace.
- * XAttr of "security" and "system" namespace is only used/exposed
- * internally to the FS impl.
- * <p/>
- * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
- * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
* @param src file or directory
* @param xAttr <code>XAttr</code> to remove
* @throws IOException
*/
@AtMostOnce
public void removeXAttr(String src, XAttr xAttr) throws IOException;
+
+ /**
+ * Checks if the user can access a path. The mode specifies which access
+ * checks to perform. If the requested permissions are granted, then the
+ * method returns normally. If access is denied, then the method throws an
+ * {@link AccessControlException}.
+ * In general, applications should avoid using this method, due to the risk of
+ * time-of-check/time-of-use race conditions. The permissions on a file may
+ * change immediately after the access call returns.
+ *
+ * @param path Path to check
+ * @param mode type of access to check
+ * @throws AccessControlException if access is denied
+ * @throws FileNotFoundException if the path does not exist
+ * @throws IOException see specific implementation
+ */
+ @Idempotent
+ public void checkAccess(String path, FsAction mode) throws IOException;
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Aug 1 01:29:49 2014
@@ -174,6 +174,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -320,6 +322,9 @@ public class ClientNamenodeProtocolServe
private static final RemoveXAttrResponseProto
VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance();
+ private static final CheckAccessResponseProto
+ VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance();
+
/**
* Constructor
*
@@ -1338,4 +1343,15 @@ public class ClientNamenodeProtocolServe
}
return VOID_REMOVEXATTR_RESPONSE;
}
+
+ @Override
+ public CheckAccessResponseProto checkAccess(RpcController controller,
+ CheckAccessRequestProto req) throws ServiceException {
+ try {
+ server.checkAccess(req.getPath(), PBHelper.convert(req.getMode()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return VOID_CHECKACCESS_RESPONSE;
+ }
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Aug 1 01:29:49 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -144,6 +145,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -1346,4 +1348,15 @@ public class ClientNamenodeProtocolTrans
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public void checkAccess(String path, FsAction mode) throws IOException {
+ CheckAccessRequestProto req = CheckAccessRequestProto.newBuilder()
+ .setPath(path).setMode(PBHelper.convert(mode)).build();
+ try {
+ rpcProxy.checkAccess(null, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Aug 1 01:29:49 2014
@@ -352,15 +352,19 @@ public class PBHelper {
return BlockWithLocationsProto.newBuilder()
.setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
- .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
+ .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
+ .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
+ .build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
final List<String> datanodeUuids = b.getDatanodeUuidsList();
final List<String> storageUuids = b.getStorageUuidsList();
+ final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
return new BlockWithLocations(convert(b.getBlock()),
datanodeUuids.toArray(new String[datanodeUuids.size()]),
- storageUuids.toArray(new String[storageUuids.size()]));
+ storageUuids.toArray(new String[storageUuids.size()]),
+ convertStorageTypes(storageTypes, storageUuids.size()));
}
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@ -2111,11 +2115,11 @@ public class PBHelper {
return castEnum(v, XATTR_NAMESPACE_VALUES);
}
- private static FsActionProto convert(FsAction v) {
+ public static FsActionProto convert(FsAction v) {
return FsActionProto.valueOf(v != null ? v.ordinal() : 0);
}
- private static FsAction convert(FsActionProto v) {
+ public static FsAction convert(FsActionProto v) {
return castEnum(v, FSACTION_VALUES);
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug 1 01:29:49 2014
@@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.EnumMap;
import java.util.Formatter;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +46,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -78,16 +80,21 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.base.Preconditions;
+
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster.
* The tool is deployed as an application program that can be run by the
@@ -188,7 +195,9 @@ import org.apache.hadoop.util.ToolRunner
@InterfaceAudience.Private
public class Balancer {
static final Log LOG = LogFactory.getLog(Balancer.class);
- final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+ final private static long GB = 1L << 30; //1GB
+ final private static long MAX_SIZE_TO_MOVE = 10*GB;
+ final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
/** The maximum number of concurrent blocks moves for
@@ -203,34 +212,38 @@ public class Balancer {
+ "\n\t[-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
+ BalancingPolicy.Pool.INSTANCE.getName()
- + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+ + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+ + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+ + "\tExcludes the specified datanodes."
+ + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+ + "\tIncludes only the specified datanodes.";
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
private final SaslDataTransferClient saslClient;
private final double threshold;
+ // set of data nodes to be excluded from balancing operations.
+ Set<String> nodesToBeExcluded;
+ //Restrict balancing to the following nodes.
+ Set<String> nodesToBeIncluded;
// all data node lists
- private final Collection<Source> overUtilizedDatanodes
- = new LinkedList<Source>();
- private final Collection<Source> aboveAvgUtilizedDatanodes
- = new LinkedList<Source>();
- private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes
- = new LinkedList<BalancerDatanode>();
- private final Collection<BalancerDatanode> underUtilizedDatanodes
- = new LinkedList<BalancerDatanode>();
-
- private final Collection<Source> sources
- = new HashSet<Source>();
- private final Collection<BalancerDatanode> targets
- = new HashSet<BalancerDatanode>();
+ private final Collection<Source> overUtilized = new LinkedList<Source>();
+ private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
+ private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized
+ = new LinkedList<BalancerDatanode.StorageGroup>();
+ private final Collection<BalancerDatanode.StorageGroup> underUtilized
+ = new LinkedList<BalancerDatanode.StorageGroup>();
+
+ private final Collection<Source> sources = new HashSet<Source>();
+ private final Collection<BalancerDatanode.StorageGroup> targets
+ = new HashSet<BalancerDatanode.StorageGroup>();
private final Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>();
private final MovedBlocks movedBlocks = new MovedBlocks();
- /** Map (datanodeUuid -> BalancerDatanodes) */
- private final Map<String, BalancerDatanode> datanodeMap
- = new HashMap<String, BalancerDatanode>();
+ /** Map (datanodeUuid,storageType -> StorageGroup) */
+ private final StorageGroupMap storageGroupMap = new StorageGroupMap();
private NetworkTopology cluster;
@@ -238,12 +251,39 @@ public class Balancer {
private final ExecutorService dispatcherExecutor;
private final int maxConcurrentMovesPerNode;
+
+ private static class StorageGroupMap {
+ private static String toKey(String datanodeUuid, StorageType storageType) {
+ return datanodeUuid + ":" + storageType;
+ }
+
+ private final Map<String, BalancerDatanode.StorageGroup> map
+ = new HashMap<String, BalancerDatanode.StorageGroup>();
+
+ BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) {
+ return map.get(toKey(datanodeUuid, storageType));
+ }
+
+ void put(BalancerDatanode.StorageGroup g) {
+ final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
+ final BalancerDatanode.StorageGroup existing = map.put(key, g);
+ Preconditions.checkState(existing == null);
+ }
+
+ int size() {
+ return map.size();
+ }
+
+ void clear() {
+ map.clear();
+ }
+ }
/* This class keeps track of a scheduled block move */
private class PendingBlockMove {
private BalancerBlock block;
private Source source;
private BalancerDatanode proxySource;
- private BalancerDatanode target;
+ private BalancerDatanode.StorageGroup target;
/** constructor */
private PendingBlockMove() {
@@ -254,7 +294,7 @@ public class Balancer {
final Block b = block.getBlock();
return b + " with size=" + b.getNumBytes() + " from "
+ source.getDisplayName() + " to " + target.getDisplayName()
- + " through " + proxySource.getDisplayName();
+ + " through " + proxySource.datanode;
}
/* choose a block & a proxy source for this pendingMove
@@ -306,20 +346,20 @@ public class Balancer {
final DatanodeInfo targetDN = target.getDatanode();
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
- for (BalancerDatanode loc : block.getLocations()) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
return true;
}
}
}
// check if there is replica which is on the same rack with the target
- for (BalancerDatanode loc : block.getLocations()) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
return true;
}
}
// find out a non-busy replica
- for (BalancerDatanode loc : block.getLocations()) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (addTo(loc)) {
return true;
}
@@ -327,8 +367,9 @@ public class Balancer {
return false;
}
- // add a BalancerDatanode as proxy source for specific block movement
- private boolean addTo(BalancerDatanode bdn) {
+ /** add to a proxy source for specific block movement */
+ private boolean addTo(BalancerDatanode.StorageGroup g) {
+ final BalancerDatanode bdn = g.getBalancerDatanode();
if (bdn.addPendingBlock(this)) {
proxySource = bdn;
return true;
@@ -344,7 +385,7 @@ public class Balancer {
DataInputStream in = null;
try {
sock.connect(
- NetUtils.createSocketAddr(target.datanode.getXferAddr()),
+ NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
/* Unfortunately we don't have a good way to know if the Datanode is
* taking a really long time to move a block, OR something has
@@ -361,7 +402,7 @@ public class Balancer {
ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, nnc, accessToken, target.datanode);
+ unbufIn, nnc, accessToken, target.getDatanode());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -381,14 +422,14 @@ public class Balancer {
* gets out of sync with work going on in datanode.
*/
proxySource.activateDelay(DELAY_AFTER_ERROR);
- target.activateDelay(DELAY_AFTER_ERROR);
+ target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
proxySource.removePendingBlock(this);
- target.removePendingBlock(this);
+ target.getBalancerDatanode().removePendingBlock(this);
synchronized (this ) {
reset();
@@ -404,7 +445,7 @@ public class Balancer {
StorageType storageType,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, storageType, accessToken,
- source.getStorageID(), proxySource.getDatanode());
+ source.getDatanode().getDatanodeUuid(), proxySource.datanode);
}
/* Receive a block copy response from the input stream */
@@ -444,8 +485,9 @@ public class Balancer {
/* A class for keeping track of blocks in the Balancer */
static private class BalancerBlock {
private final Block block; // the block
- private final List<BalancerDatanode> locations
- = new ArrayList<BalancerDatanode>(3); // its locations
+ /** The locations of the replicas of the block. */
+ private final List<BalancerDatanode.StorageGroup> locations
+ = new ArrayList<BalancerDatanode.StorageGroup>(3);
/* Constructor */
private BalancerBlock(Block block) {
@@ -458,20 +500,19 @@ public class Balancer {
}
/* add a location */
- private synchronized void addLocation(BalancerDatanode datanode) {
- if (!locations.contains(datanode)) {
- locations.add(datanode);
+ private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
+ if (!locations.contains(g)) {
+ locations.add(g);
}
}
- /* Return if the block is located on <code>datanode</code> */
- private synchronized boolean isLocatedOnDatanode(
- BalancerDatanode datanode) {
- return locations.contains(datanode);
+ /** @return if the block is located on the given storage group. */
+ private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
+ return locations.contains(g);
}
/* Return its locations */
- private synchronized List<BalancerDatanode> getLocations() {
+ private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
return locations;
}
@@ -488,37 +529,84 @@ public class Balancer {
/* The class represents a desired move of bytes between two nodes
* and the target.
- * An object of this class is stored in a source node.
+ * An object of this class is stored in a source.
*/
- static private class NodeTask {
- private final BalancerDatanode datanode; //target node
+ static private class Task {
+ private final BalancerDatanode.StorageGroup target;
private long size; //bytes scheduled to move
/* constructor */
- private NodeTask(BalancerDatanode datanode, long size) {
- this.datanode = datanode;
+ private Task(BalancerDatanode.StorageGroup target, long size) {
+ this.target = target;
this.size = size;
}
-
- /* Get the node */
- private BalancerDatanode getDatanode() {
- return datanode;
- }
-
- /* Get the number of bytes that need to be moved */
- private long getSize() {
- return size;
- }
}
/* A class that keeps track of a datanode in Balancer */
private static class BalancerDatanode {
- final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+
+ /** A group of storages in a datanode with the same storage type. */
+ private class StorageGroup {
+ final StorageType storageType;
+ final double utilization;
+ final long maxSize2Move;
+ private long scheduledSize = 0L;
+
+ private StorageGroup(StorageType storageType, double utilization,
+ long maxSize2Move) {
+ this.storageType = storageType;
+ this.utilization = utilization;
+ this.maxSize2Move = maxSize2Move;
+ }
+
+ BalancerDatanode getBalancerDatanode() {
+ return BalancerDatanode.this;
+ }
+
+ DatanodeInfo getDatanode() {
+ return BalancerDatanode.this.datanode;
+ }
+
+ /** Decide if still need to move more bytes */
+ protected synchronized boolean hasSpaceForScheduling() {
+ return availableSizeToMove() > 0L;
+ }
+
+ /** @return the total number of bytes that need to be moved */
+ synchronized long availableSizeToMove() {
+ return maxSize2Move - scheduledSize;
+ }
+
+ /** increment scheduled size */
+ synchronized void incScheduledSize(long size) {
+ scheduledSize += size;
+ }
+
+ /** @return scheduled size */
+ synchronized long getScheduledSize() {
+ return scheduledSize;
+ }
+
+ /** Reset scheduled size to zero. */
+ synchronized void resetScheduledSize() {
+ scheduledSize = 0L;
+ }
+
+ /** @return the name for display */
+ String getDisplayName() {
+ return datanode + ":" + storageType;
+ }
+
+ @Override
+ public String toString() {
+ return "" + utilization;
+ }
+ }
+
final DatanodeInfo datanode;
- final double utilization;
- final long maxSize2Move;
- private long scheduledSize = 0L;
+ final EnumMap<StorageType, StorageGroup> storageMap
+ = new EnumMap<StorageType, StorageGroup>(StorageType.class);
protected long delayUntil = 0L;
// blocks being moved but not confirmed yet
private final List<PendingBlockMove> pendingBlocks;
@@ -526,78 +614,38 @@ public class Balancer {
@Override
public String toString() {
- return getClass().getSimpleName() + "[" + datanode
- + ", utilization=" + utilization + "]";
+ return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
}
/* Constructor
* Depending on avgutil & threshold, calculate maximum bytes to move
*/
- private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
- int maxConcurrentMoves) {
- datanode = node;
- utilization = policy.getUtilization(node);
- final double avgUtil = policy.getAvgUtilization();
- long maxSizeToMove;
-
- if (utilization >= avgUtil+threshold
- || utilization <= avgUtil-threshold) {
- maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
- } else {
- maxSizeToMove =
- (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
- }
- if (utilization < avgUtil ) {
- maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
- }
- this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+ private BalancerDatanode(DatanodeStorageReport report,
+ double threshold, int maxConcurrentMoves) {
+ this.datanode = report.getDatanodeInfo();
this.maxConcurrentMoves = maxConcurrentMoves;
this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
}
- /** Get the datanode */
- protected DatanodeInfo getDatanode() {
- return datanode;
- }
-
- /** Get the name of the datanode */
- protected String getDisplayName() {
- return datanode.toString();
- }
-
- /* Get the storage id of the datanode */
- protected String getStorageID() {
- return datanode.getDatanodeUuid();
- }
-
- /** Decide if still need to move more bytes */
- protected synchronized boolean hasSpaceForScheduling() {
- return scheduledSize<maxSize2Move;
- }
-
- /** Return the total number of bytes that need to be moved */
- protected synchronized long availableSizeToMove() {
- return maxSize2Move-scheduledSize;
- }
-
- /** increment scheduled size */
- protected synchronized void incScheduledSize(long size) {
- scheduledSize += size;
- }
-
- /** decrement scheduled size */
- protected synchronized void decScheduledSize(long size) {
- scheduledSize -= size;
- }
-
- /** get scheduled size */
- protected synchronized long getScheduledSize(){
- return scheduledSize;
- }
-
- /** get scheduled size */
- protected synchronized void setScheduledSize(long size){
- scheduledSize = size;
+ private void put(StorageType storageType, StorageGroup g) {
+ final StorageGroup existing = storageMap.put(storageType, g);
+ Preconditions.checkState(existing == null);
+ }
+
+ StorageGroup addStorageGroup(StorageType storageType, double utilization,
+ long maxSize2Move) {
+ final StorageGroup g = new StorageGroup(storageType, utilization,
+ maxSize2Move);
+ put(storageType, g);
+ return g;
+ }
+
+ Source addSource(StorageType storageType, double utilization,
+ long maxSize2Move, Balancer balancer) {
+ final Source s = balancer.new Source(storageType, utilization,
+ maxSize2Move, this);
+ put(storageType, s);
+ return s;
}
synchronized private void activateDelay(long delta) {
@@ -640,9 +688,9 @@ public class Balancer {
return pendingBlocks.remove(pendingBlock);
}
}
-
+
/** A node that can be the sources of a block move */
- private class Source extends BalancerDatanode {
+ private class Source extends BalancerDatanode.StorageGroup {
/* A thread that initiates a block move
* and waits for block move to complete */
@@ -653,7 +701,7 @@ public class Balancer {
}
}
- private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
+ private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L;
/* source blocks point to balancerBlocks in the global list because
* we want to keep one copy of a block in balancer and be aware that
@@ -663,17 +711,17 @@ public class Balancer {
= new ArrayList<BalancerBlock>();
/* constructor */
- private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
- int maxConcurrentMoves) {
- super(node, policy, threshold, maxConcurrentMoves);
+ private Source(StorageType storageType, double utilization,
+ long maxSize2Move, BalancerDatanode dn) {
+ dn.super(storageType, utilization, maxSize2Move);
}
- /** Add a node task */
- private void addNodeTask(NodeTask task) {
- assert (task.datanode != this) :
- "Source and target are the same " + datanode;
- incScheduledSize(task.getSize());
- nodeTasks.add(task);
+ /** Add a task */
+ private void addTask(Task task) {
+ Preconditions.checkState(task.target != this,
+ "Source and target are the same storage group " + getDisplayName());
+ incScheduledSize(task.size);
+ tasks.add(task);
}
/* Return an iterator to this source's blocks */
@@ -686,8 +734,10 @@ public class Balancer {
* Return the total size of the received blocks in the number of bytes.
*/
private long getBlockList() throws IOException {
- BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode,
- Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+ final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+ final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
+ getDatanode(), size).getBlocks();
+
long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks) {
bytesReceived += blk.getBlock().getNumBytes();
@@ -703,10 +753,13 @@ public class Balancer {
synchronized (block) {
// update locations
- for (String datanodeUuid : blk.getDatanodeUuids()) {
- final BalancerDatanode d = datanodeMap.get(datanodeUuid);
- if (d != null) { // not an unknown datanode
- block.addLocation(d);
+ final String[] datanodeUuids = blk.getDatanodeUuids();
+ final StorageType[] storageTypes = blk.getStorageTypes();
+ for (int i = 0; i < datanodeUuids.length; i++) {
+ final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+ datanodeUuids[i], storageTypes[i]);
+ if (g != null) { // not unknown
+ block.addLocation(g);
}
}
}
@@ -721,8 +774,8 @@ public class Balancer {
/* Decide if the given block is a good candidate to move or not */
private boolean isGoodBlockCandidate(BalancerBlock block) {
- for (NodeTask nodeTask : nodeTasks) {
- if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+ for (Task t : tasks) {
+ if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) {
return true;
}
}
@@ -737,20 +790,20 @@ public class Balancer {
* The block should be dispatched immediately after this method is returned.
*/
private PendingBlockMove chooseNextBlockToMove() {
- for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
- NodeTask task = tasks.next();
- BalancerDatanode target = task.getDatanode();
+ for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
+ final Task task = i.next();
+ final BalancerDatanode target = task.target.getBalancerDatanode();
PendingBlockMove pendingBlock = new PendingBlockMove();
if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
pendingBlock.source = this;
- pendingBlock.target = target;
+ pendingBlock.target = task.target;
if ( pendingBlock.chooseBlockAndProxy() ) {
long blockSize = pendingBlock.block.getNumBytes();
- decScheduledSize(blockSize);
+ incScheduledSize(-blockSize);
task.size -= blockSize;
if (task.size == 0) {
- tasks.remove();
+ i.remove();
}
return pendingBlock;
} else {
@@ -824,7 +877,7 @@ public class Balancer {
// in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations.
if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
- setScheduledSize(0);
+ resetScheduledSize();
}
}
@@ -869,6 +922,8 @@ public class Balancer {
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
this.threshold = p.threshold;
this.policy = p.policy;
+ this.nodesToBeExcluded = p.nodesToBeExcluded;
+ this.nodesToBeIncluded = p.nodesToBeIncluded;
this.nnc = theblockpool;
cluster = NetworkTopology.getInstance(conf);
@@ -889,95 +944,154 @@ public class Balancer {
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
- /* Given a data node set, build a network topology and decide
- * over-utilized datanodes, above average utilized datanodes,
- * below average utilized datanodes, and underutilized datanodes.
- * The input data node set is shuffled before the datanodes
- * are put into the over-utilized datanodes, above average utilized
- * datanodes, below average utilized datanodes, and
- * underutilized datanodes lists. This will add some randomness
- * to the node matching later on.
- *
+
+ private static long getCapacity(DatanodeStorageReport report, StorageType t) {
+ long capacity = 0L;
+ for(StorageReport r : report.getStorageReports()) {
+ if (r.getStorage().getStorageType() == t) {
+ capacity += r.getCapacity();
+ }
+ }
+ return capacity;
+ }
+
+ private static long getRemaining(DatanodeStorageReport report, StorageType t) {
+ long remaining = 0L;
+ for(StorageReport r : report.getStorageReports()) {
+ if (r.getStorage().getStorageType() == t) {
+ remaining += r.getRemaining();
+ }
+ }
+ return remaining;
+ }
+
+ private boolean shouldIgnore(DatanodeInfo dn) {
+ //ignore decommissioned nodes
+ final boolean decommissioned = dn.isDecommissioned();
+ //ignore decommissioning nodes
+ final boolean decommissioning = dn.isDecommissionInProgress();
+ // ignore nodes in exclude list
+ final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn);
+ // ignore nodes not in the include list (if include list is not empty)
+ final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn);
+
+ if (decommissioned || decommissioning || excluded || notIncluded) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+ + decommissioning + ", " + excluded + ", " + notIncluded);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Given a datanode storage set, build a network topology and decide
+ * over-utilized storages, above average utilized storages,
+ * below average utilized storages, and underutilized storages.
+ * The input datanode storage set is shuffled in order to randomize
+ * to the storage matching later on.
+ *
* @return the total number of bytes that are
* needed to move to make the cluster balanced.
- * @param datanodes a set of datanodes
+ * @param reports a set of datanode storage reports
*/
- private long initNodes(DatanodeInfo[] datanodes) {
+ private long init(DatanodeStorageReport[] reports) {
// compute average utilization
- for (DatanodeInfo datanode : datanodes) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
+ for (DatanodeStorageReport r : reports) {
+ if (shouldIgnore(r.getDatanodeInfo())) {
+ continue;
}
- policy.accumulateSpaces(datanode);
+ policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
- /*create network topology and all data node lists:
- * overloaded, above-average, below-average, and underloaded
- * we alternates the accessing of the given datanodes array either by
- * an increasing order or a decreasing order.
- */
+ // create network topology and classify utilization collections:
+ // over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
- for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+ for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
+ final DatanodeInfo datanode = r.getDatanodeInfo();
+ if (shouldIgnore(datanode)) {
continue; // ignore decommissioning or decommissioned nodes
}
cluster.add(datanode);
- BalancerDatanode datanodeS;
- final double avg = policy.getAvgUtilization();
- if (policy.getUtilization(datanode) >= avg) {
- datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
- if (isAboveAvgUtilized(datanodeS)) {
- this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
- } else {
- assert(isOverUtilized(datanodeS)) :
- datanodeS.getDisplayName()+ "is not an overUtilized node";
- this.overUtilizedDatanodes.add((Source)datanodeS);
- overLoadedBytes += (long)((datanodeS.utilization-avg
- -threshold)*datanodeS.datanode.getCapacity()/100.0);
+
+ final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes,
+ maxConcurrentMovesPerNode);
+ for(StorageType t : StorageType.asList()) {
+ final Double utilization = policy.getUtilization(r, t);
+ if (utilization == null) { // datanode does not have such storage type
+ continue;
}
- } else {
- datanodeS = new BalancerDatanode(datanode, policy, threshold,
- maxConcurrentMovesPerNode);
- if ( isBelowOrEqualAvgUtilized(datanodeS)) {
- this.belowAvgUtilizedDatanodes.add(datanodeS);
+
+ final long capacity = getCapacity(r, t);
+ final double utilizationDiff = utilization - policy.getAvgUtilization(t);
+ final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
+ final long maxSize2Move = computeMaxSize2Move(capacity,
+ getRemaining(r, t), utilizationDiff, threshold);
+
+ final BalancerDatanode.StorageGroup g;
+ if (utilizationDiff > 0) {
+ final Source s = dn.addSource(t, utilization, maxSize2Move, this);
+ if (thresholdDiff <= 0) { // within threshold
+ aboveAvgUtilized.add(s);
+ } else {
+ overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+ overUtilized.add(s);
+ }
+ g = s;
} else {
- assert isUnderUtilized(datanodeS) : "isUnderUtilized("
- + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
- + ", utilization=" + datanodeS.utilization;
- this.underUtilizedDatanodes.add(datanodeS);
- underLoadedBytes += (long)((avg-threshold-
- datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+ g = dn.addStorageGroup(t, utilization, maxSize2Move);
+ if (thresholdDiff <= 0) { // within threshold
+ belowAvgUtilized.add(g);
+ } else {
+ underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+ underUtilized.add(g);
+ }
}
+ storageGroupMap.put(g);
}
- datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
}
- //logging
- logNodes();
+ logUtilizationCollections();
- assert (this.datanodeMap.size() ==
- overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
- aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
- : "Mismatched number of datanodes";
+ Preconditions.checkState(storageGroupMap.size() == overUtilized.size()
+ + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(),
+ "Mismatched number of storage groups");
// return number of bytes to be moved in order to make the cluster balanced
return Math.max(overLoadedBytes, underLoadedBytes);
}
+ private static long computeMaxSize2Move(final long capacity, final long remaining,
+ final double utilizationDiff, final double threshold) {
+ final double diff = Math.min(threshold, Math.abs(utilizationDiff));
+ long maxSizeToMove = precentage2bytes(diff, capacity);
+ if (utilizationDiff < 0) {
+ maxSizeToMove = Math.min(remaining, maxSizeToMove);
+ }
+ return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+ }
+
+ private static long precentage2bytes(double precentage, long capacity) {
+ Preconditions.checkArgument(precentage >= 0,
+ "precentage = " + precentage + " < 0");
+ return (long)(precentage * capacity / 100.0);
+ }
+
/* log the over utilized & under utilized nodes */
- private void logNodes() {
- logNodes("over-utilized", overUtilizedDatanodes);
+ private void logUtilizationCollections() {
+ logUtilizationCollection("over-utilized", overUtilized);
if (LOG.isTraceEnabled()) {
- logNodes("above-average", aboveAvgUtilizedDatanodes);
- logNodes("below-average", belowAvgUtilizedDatanodes);
+ logUtilizationCollection("above-average", aboveAvgUtilized);
+ logUtilizationCollection("below-average", belowAvgUtilized);
}
- logNodes("underutilized", underUtilizedDatanodes);
+ logUtilizationCollection("underutilized", underUtilized);
}
- private static <T extends BalancerDatanode> void logNodes(
- String name, Collection<T> nodes) {
- LOG.info(nodes.size() + " " + name + ": " + nodes);
+ private static <T extends BalancerDatanode.StorageGroup>
+ void logUtilizationCollection(String name, Collection<T> items) {
+ LOG.info(items.size() + " " + name + ": " + items);
}
/** A matcher interface for matching nodes. */
@@ -1013,26 +1127,24 @@ public class Balancer {
/**
* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
- * Maximum bytes to be moved per node is
- * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
- * Return total number of bytes to move in this iteration
+ * Maximum bytes to be moved per storage group is
+ * min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
+ * @return total number of bytes to move in this iteration
*/
- private long chooseNodes() {
+ private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware
if (cluster.isNodeGroupAware()) {
- chooseNodes(SAME_NODE_GROUP);
+ chooseStorageGroups(SAME_NODE_GROUP);
}
// Then, match nodes on the same rack
- chooseNodes(SAME_RACK);
+ chooseStorageGroups(SAME_RACK);
// At last, match all remaining nodes
- chooseNodes(ANY_OTHER);
+ chooseStorageGroups(ANY_OTHER);
- assert (datanodeMap.size() >= sources.size()+targets.size())
- : "Mismatched number of datanodes (" +
- datanodeMap.size() + " total, " +
- sources.size() + " sources, " +
- targets.size() + " targets)";
+ Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
+ "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
+ + sources.size() + " sources, " + targets.size() + " targets)");
long bytesToMove = 0L;
for (Source src : sources) {
@@ -1042,25 +1154,25 @@ public class Balancer {
}
/** Decide all <source, target> pairs according to the matcher. */
- private void chooseNodes(final Matcher matcher) {
+ private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
- chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
+ chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
- chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
+ chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
- chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
+ chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}
/**
@@ -1068,13 +1180,14 @@ public class Balancer {
* datanodes or the candidates are source nodes with (utilization > Avg), and
* the others are target nodes with (utilization < Avg).
*/
- private <D extends BalancerDatanode, C extends BalancerDatanode> void
- chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+ private <G extends BalancerDatanode.StorageGroup,
+ C extends BalancerDatanode.StorageGroup>
+ void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
Matcher matcher) {
- for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
- final D datanode = i.next();
- for(; chooseForOneDatanode(datanode, candidates, matcher); );
- if (!datanode.hasSpaceForScheduling()) {
+ for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
+ final G g = i.next();
+ for(; choose4One(g, candidates, matcher); );
+ if (!g.hasSpaceForScheduling()) {
i.remove();
}
}
@@ -1084,18 +1197,19 @@ public class Balancer {
* For the given datanode, choose a candidate and then schedule it.
* @return true if a candidate is chosen; false if no candidates is chosen.
*/
- private <C extends BalancerDatanode> boolean chooseForOneDatanode(
- BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+ private <C extends BalancerDatanode.StorageGroup>
+ boolean choose4One(BalancerDatanode.StorageGroup g,
+ Collection<C> candidates, Matcher matcher) {
final Iterator<C> i = candidates.iterator();
- final C chosen = chooseCandidate(dn, i, matcher);
-
+ final C chosen = chooseCandidate(g, i, matcher);
+
if (chosen == null) {
return false;
}
- if (dn instanceof Source) {
- matchSourceWithTargetToMove((Source)dn, chosen);
+ if (g instanceof Source) {
+ matchSourceWithTargetToMove((Source)g, chosen);
} else {
- matchSourceWithTargetToMove((Source)chosen, dn);
+ matchSourceWithTargetToMove((Source)chosen, g);
}
if (!chosen.hasSpaceForScheduling()) {
i.remove();
@@ -1103,27 +1217,28 @@ public class Balancer {
return true;
}
- private void matchSourceWithTargetToMove(
- Source source, BalancerDatanode target) {
+ private void matchSourceWithTargetToMove(Source source,
+ BalancerDatanode.StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
+ final Task task = new Task(target, size);
+ source.addTask(task);
+ target.incScheduledSize(task.size);
sources.add(source);
targets.add(target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode.getName() + " to " + target.datanode.getName());
+ + source.getDisplayName() + " to " + target.getDisplayName());
}
/** Choose a candidate for the given datanode. */
- private <D extends BalancerDatanode, C extends BalancerDatanode>
- C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
- if (dn.hasSpaceForScheduling()) {
+ private <G extends BalancerDatanode.StorageGroup,
+ C extends BalancerDatanode.StorageGroup>
+ C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
+ if (g.hasSpaceForScheduling()) {
for(; candidates.hasNext(); ) {
final C c = candidates.next();
if (!c.hasSpaceForScheduling()) {
candidates.remove();
- } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+ } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) {
return c;
}
}
@@ -1177,9 +1292,10 @@ public class Balancer {
boolean shouldWait;
do {
shouldWait = false;
- for (BalancerDatanode target : targets) {
- if (!target.isPendingQEmpty()) {
+ for (BalancerDatanode.StorageGroup target : targets) {
+ if (!target.getBalancerDatanode().isPendingQEmpty()) {
shouldWait = true;
+ break;
}
}
if (shouldWait) {
@@ -1248,12 +1364,15 @@ public class Balancer {
* 3. doing the move does not reduce the number of racks that the block has
*/
private boolean isGoodBlockCandidate(Source source,
- BalancerDatanode target, BalancerBlock block) {
+ BalancerDatanode.StorageGroup target, BalancerBlock block) {
+ if (source.storageType != target.storageType) {
+ return false;
+ }
// check if the block is moved or not
if (movedBlocks.contains(block)) {
- return false;
+ return false;
}
- if (block.isLocatedOnDatanode(target)) {
+ if (block.isLocatedOn(target)) {
return false;
}
if (cluster.isNodeGroupAware() &&
@@ -1268,8 +1387,8 @@ public class Balancer {
} else {
boolean notOnSameRack = true;
synchronized (block) {
- for (BalancerDatanode loc : block.locations) {
- if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
+ for (BalancerDatanode.StorageGroup loc : block.locations) {
+ if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
notOnSameRack = false;
break;
}
@@ -1280,9 +1399,9 @@ public class Balancer {
goodBlock = true;
} else {
// good if source is on the same rack as on of the replicas
- for (BalancerDatanode loc : block.locations) {
+ for (BalancerDatanode.StorageGroup loc : block.locations) {
if (loc != source &&
- cluster.isOnSameRack(loc.datanode, source.datanode)) {
+ cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
goodBlock = true;
break;
}
@@ -1303,25 +1422,26 @@ public class Balancer {
* @return true if there are any replica (other than source) on the same node
* group with target
*/
- private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+ private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
BalancerBlock block, Source source) {
- for (BalancerDatanode loc : block.locations) {
+ final DatanodeInfo targetDn = target.getDatanode();
+ for (BalancerDatanode.StorageGroup loc : block.locations) {
if (loc != source &&
- cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
- return true;
- }
+ cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
+ return true;
}
+ }
return false;
}
/* reset all fields in a balancer preparing for the next iteration */
private void resetData(Configuration conf) {
this.cluster = NetworkTopology.getInstance(conf);
- this.overUtilizedDatanodes.clear();
- this.aboveAvgUtilizedDatanodes.clear();
- this.belowAvgUtilizedDatanodes.clear();
- this.underUtilizedDatanodes.clear();
- this.datanodeMap.clear();
+ this.overUtilized.clear();
+ this.aboveAvgUtilized.clear();
+ this.belowAvgUtilized.clear();
+ this.underUtilized.clear();
+ this.storageGroupMap.clear();
this.sources.clear();
this.targets.clear();
this.policy.reset();
@@ -1341,32 +1461,6 @@ public class Balancer {
}
}
}
-
- /* Return true if the given datanode is overUtilized */
- private boolean isOverUtilized(BalancerDatanode datanode) {
- return datanode.utilization > (policy.getAvgUtilization()+threshold);
- }
-
- /* Return true if the given datanode is above or equal to average utilized
- * but not overUtilized */
- private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
- final double avg = policy.getAvgUtilization();
- return (datanode.utilization <= (avg+threshold))
- && (datanode.utilization >= avg);
- }
-
- /* Return true if the given datanode is underUtilized */
- private boolean isUnderUtilized(BalancerDatanode datanode) {
- return datanode.utilization < (policy.getAvgUtilization()-threshold);
- }
-
- /* Return true if the given datanode is below average utilized
- * but not underUtilized */
- private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
- final double avg = policy.getAvgUtilization();
- return (datanode.utilization >= (avg-threshold))
- && (datanode.utilization <= avg);
- }
// Exit status
enum ReturnStatus {
@@ -1394,7 +1488,8 @@ public class Balancer {
/* get all live datanodes of a cluster and their disk usage
* decide the number of bytes need to be moved
*/
- final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+ final long bytesLeftToMove = init(
+ nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
return ReturnStatus.SUCCESS;
@@ -1408,7 +1503,7 @@ public class Balancer {
* in this iteration. Maximum bytes to be moved per node is
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/
- final long bytesToMove = chooseNodes();
+ final long bytesToMove = chooseStorageGroups();
if (bytesToMove == 0) {
System.out.println("No block can be moved. Exiting...");
return ReturnStatus.NO_MOVE_BLOCK;
@@ -1526,21 +1621,101 @@ public class Balancer {
}
static class Parameters {
- static final Parameters DEFALUT = new Parameters(
- BalancingPolicy.Node.INSTANCE, 10.0);
+ static final Parameters DEFAULT = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 10.0,
+ Collections.<String> emptySet(), Collections.<String> emptySet());
final BalancingPolicy policy;
final double threshold;
+ // exclude the nodes in this set from balancing operations
+ Set<String> nodesToBeExcluded;
+ //include only these nodes in balancing operations
+ Set<String> nodesToBeIncluded;
- Parameters(BalancingPolicy policy, double threshold) {
+ Parameters(BalancingPolicy policy, double threshold,
+ Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
this.policy = policy;
this.threshold = threshold;
+ this.nodesToBeExcluded = nodesToBeExcluded;
+ this.nodesToBeIncluded = nodesToBeIncluded;
}
@Override
public String toString() {
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
- + "[" + policy + ", threshold=" + threshold + "]";
+ + "[" + policy + ", threshold=" + threshold +
+ ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+ ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+ }
+ }
+
+ static class Util {
+
+ /**
+ * @param datanode
+ * @return returns true if data node is part of the excludedNodes.
+ */
+ static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
+ return isIn(excludedNodes, datanode);
+ }
+
+ /**
+ * @param datanode
+ * @return returns true if includedNodes is empty or data node is part of the includedNodes.
+ */
+ static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
+ return (includedNodes.isEmpty() ||
+ isIn(includedNodes, datanode));
+ }
+ /**
+ * Match is checked using host name , ip address with and without port number.
+ * @param datanodeSet
+ * @param datanode
+ * @return true if the datanode's transfer address matches the set of nodes.
+ */
+ private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
+ return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
+ isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
+ isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
+ }
+
+ /**
+ * returns true if nodes contains host or host:port
+ * @param nodes
+ * @param host
+ * @param port
+ * @return
+ */
+ private static boolean isIn(Set<String> nodes, String host, int port) {
+ if (host == null) {
+ return false;
+ }
+ return (nodes.contains(host) || nodes.contains(host +":"+ port));
+ }
+
+ /**
+ * parse a comma separated string to obtain set of host names
+ * @param string
+ * @return
+ */
+ static Set<String> parseHostList(String string) {
+ String[] addrs = StringUtils.getTrimmedStrings(string);
+ return new HashSet<String>(Arrays.asList(addrs));
+ }
+
+ /**
+ * read set of host names from a file
+ * @param fileName
+ * @return
+ */
+ static Set<String> getHostListFromFile(String fileName) {
+ Set<String> nodes = new HashSet <String> ();
+ try {
+ HostsFileReader.readFileToSet("nodes", fileName, nodes);
+ return StringUtils.getTrimmedStrings(nodes);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Unable to open file: " + fileName);
+ }
}
}
@@ -1578,8 +1753,10 @@ public class Balancer {
/** parse command line arguments */
static Parameters parse(String[] args) {
- BalancingPolicy policy = Parameters.DEFALUT.policy;
- double threshold = Parameters.DEFALUT.threshold;
+ BalancingPolicy policy = Parameters.DEFAULT.policy;
+ double threshold = Parameters.DEFAULT.threshold;
+ Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+ Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
if (args != null) {
try {
@@ -1608,18 +1785,38 @@ public class Balancer {
System.err.println("Illegal policy name: " + args[i]);
throw e;
}
+ } else if ("-exclude".equalsIgnoreCase(args[i])) {
+ i++;
+ if ("-f".equalsIgnoreCase(args[i])) {
+ nodesTobeExcluded = Util.getHostListFromFile(args[++i]);
+ } else {
+ nodesTobeExcluded = Util.parseHostList(args[i]);
+ }
+ } else if ("-include".equalsIgnoreCase(args[i])) {
+ i++;
+ if ("-f".equalsIgnoreCase(args[i])) {
+ nodesTobeIncluded = Util.getHostListFromFile(args[++i]);
+ } else {
+ nodesTobeIncluded = Util.parseHostList(args[i]);
+ }
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));
}
}
+ if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) {
+ System.err.println(
+ "-exclude and -include options cannot be specified together.");
+ throw new IllegalArgumentException(
+ "-exclude and -include options cannot be specified together.");
+ }
} catch(RuntimeException e) {
printUsage(System.err);
throw e;
}
}
- return new Parameters(policy, threshold);
+ return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
}
private static void printUsage(PrintStream out) {
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1615020&r1=1615019&r2=1615020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Fri Aug 1 01:29:49 2014
@@ -18,7 +18,11 @@
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
/**
* Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D
*/
@InterfaceAudience.Private
abstract class BalancingPolicy {
- long totalCapacity;
- long totalUsedSpace;
- private double avgUtilization;
+ final EnumCounters<StorageType> totalCapacities
+ = new EnumCounters<StorageType>(StorageType.class);
+ final EnumCounters<StorageType> totalUsedSpaces
+ = new EnumCounters<StorageType>(StorageType.class);
+ final EnumDoubles<StorageType> avgUtilizations
+ = new EnumDoubles<StorageType>(StorageType.class);
void reset() {
- totalCapacity = 0L;
- totalUsedSpace = 0L;
- avgUtilization = 0.0;
+ totalCapacities.reset();
+ totalUsedSpaces.reset();
+ avgUtilizations.reset();
}
/** Get the policy name. */
abstract String getName();
/** Accumulate used space and capacity. */
- abstract void accumulateSpaces(DatanodeInfo d);
+ abstract void accumulateSpaces(DatanodeStorageReport r);
void initAvgUtilization() {
- this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+ for(StorageType t : StorageType.asList()) {
+ final long capacity = totalCapacities.get(t);
+ if (capacity > 0L) {
+ final double avg = totalUsedSpaces.get(t)*100.0/capacity;
+ avgUtilizations.set(t, avg);
+ }
+ }
}
- double getAvgUtilization() {
- return avgUtilization;
+
+ double getAvgUtilization(StorageType t) {
+ return avgUtilizations.get(t);
}
- /** Return the utilization of a datanode */
- abstract double getUtilization(DatanodeInfo d);
+ /** @return the utilization of a particular storage type of a datanode;
+ * or return null if the datanode does not have such storage type.
+ */
+ abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
@Override
public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
}
@Override
- void accumulateSpaces(DatanodeInfo d) {
- totalCapacity += d.getCapacity();
- totalUsedSpace += d.getDfsUsed();
+ void accumulateSpaces(DatanodeStorageReport r) {
+ for(StorageReport s : r.getStorageReports()) {
+ final StorageType t = s.getStorage().getStorageType();
+ totalCapacities.add(t, s.getCapacity());
+ totalUsedSpaces.add(t, s.getDfsUsed());
+ }
}
@Override
- double getUtilization(DatanodeInfo d) {
- return d.getDfsUsed()*100.0/d.getCapacity();
+ Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+ long capacity = 0L;
+ long dfsUsed = 0L;
+ for(StorageReport s : r.getStorageReports()) {
+ if (s.getStorage().getStorageType() == t) {
+ capacity += s.getCapacity();
+ dfsUsed += s.getDfsUsed();
+ }
+ }
+ return capacity == 0L? null: dfsUsed*100.0/capacity;
}
}
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
}
@Override
- void accumulateSpaces(DatanodeInfo d) {
- totalCapacity += d.getCapacity();
- totalUsedSpace += d.getBlockPoolUsed();
+ void accumulateSpaces(DatanodeStorageReport r) {
+ for(StorageReport s : r.getStorageReports()) {
+ final StorageType t = s.getStorage().getStorageType();
+ totalCapacities.add(t, s.getCapacity());
+ totalUsedSpaces.add(t, s.getBlockPoolUsed());
+ }
}
@Override
- double getUtilization(DatanodeInfo d) {
- return d.getBlockPoolUsed()*100.0/d.getCapacity();
+ Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+ long capacity = 0L;
+ long blockPoolUsed = 0L;
+ for(StorageReport s : r.getStorageReports()) {
+ if (s.getStorage().getStorageType() == t) {
+ capacity += s.getCapacity();
+ blockPoolUsed += s.getBlockPoolUsed();
+ }
+ }
+ return capacity == 0L? null: blockPoolUsed*100.0/capacity;
}
}
}