You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/07/18 20:21:18 UTC
svn commit: r1611751 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/
src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/
src/main/java/org/apache/hadoop/hdfs/web/ src/main/java/org/apache/hadoop/...
Author: szetszwo
Date: Fri Jul 18 18:21:18 2014
New Revision: 1611751
URL: http://svn.apache.org/r1611751
Log:
svn merge -c 1611750 from trunk for HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it will not redirect retries to the same datanode.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java
- copied unchanged from r1611750, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1611750
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1611751&r1=1611750&r2=1611751&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 18 18:21:18 2014
@@ -52,6 +52,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for
deletion. (szetszwo)
+ HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
+ will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1611750
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1611751&r1=1611750&r2=1611751&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Fri Jul 18 18:21:18 2014
@@ -28,6 +28,7 @@ import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import javax.servlet.ServletContext;
@@ -84,6 +85,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
+import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -113,11 +115,13 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
@@ -190,12 +194,26 @@ public class NamenodeWebHdfsMethods {
}
return np;
}
-
+
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize) throws IOException {
+ final long blocksize, final String excludeDatanodes) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
+
+ HashSet<Node> excludes = new HashSet<Node>();
+ if (excludeDatanodes != null) {
+ for (String host : StringUtils
+ .getTrimmedStringCollection(excludeDatanodes)) {
+ int idx = host.indexOf(":");
+ if (idx != -1) {
+ excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
+ host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
+ } else {
+ excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
+ }
+ }
+ }
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
@@ -204,7 +222,7 @@ public class NamenodeWebHdfsMethods {
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
- new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
+ new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize,
// TODO: get storage type from the file
StorageType.DEFAULT);
if (storages.length > 0) {
@@ -233,7 +251,7 @@ public class NamenodeWebHdfsMethods {
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
- return bestNode(locations.get(0).getLocations());
+ return bestNode(locations.get(0).getLocations(), excludes);
}
}
}
@@ -247,11 +265,14 @@ public class NamenodeWebHdfsMethods {
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
- private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
- if (nodes.length == 0 || nodes[0].isDecommissioned()) {
- throw new IOException("No active nodes contain this block");
+ private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
+ HashSet<Node> excludes) throws IOException {
+ for (DatanodeInfo dn: nodes) {
+ if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
+ return dn;
+ }
}
- return nodes[0];
+ throw new IOException("No active nodes contain this block");
}
private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -270,11 +291,12 @@ public class NamenodeWebHdfsMethods {
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize,
+ final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
try {
- dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
+ dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
+ excludeDatanodes);
} catch (InvalidTopologyException ite) {
throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
}
@@ -361,13 +383,15 @@ public class NamenodeWebHdfsMethods {
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
- final OldSnapshotNameParam oldSnapshotName
- )throws IOException, InterruptedException {
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
+ ) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
- xattrSetFlag, snapshotName, oldSnapshotName);
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
}
/** Handle HTTP PUT request. */
@@ -423,14 +447,16 @@ public class NamenodeWebHdfsMethods {
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
- final OldSnapshotNameParam oldSnapshotName
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
- oldSnapshotName);
+ oldSnapshotName, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -441,7 +467,7 @@ public class NamenodeWebHdfsMethods {
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
- xattrSetFlag, snapshotName, oldSnapshotName);
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
} finally {
reset();
}
@@ -474,7 +500,8 @@ public class NamenodeWebHdfsMethods {
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
- final OldSnapshotNameParam oldSnapshotName
+ final OldSnapshotNameParam oldSnapshotName,
+ final ExcludeDatanodesParam exclDatanodes
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -484,9 +511,10 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case CREATE:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, blockSize.getValue(conf),
- permission, overwrite, bufferSize, replication, blockSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
+ exclDatanodes.getValue(), permission, overwrite, bufferSize,
+ replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:
@@ -619,9 +647,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
+ return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
+ bufferSize, excludeDatanodes);
}
/** Handle HTTP POST request. */
@@ -643,17 +674,21 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
+ init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
+ excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return post(ugi, delegation, username, doAsUser,
- path.getAbsolutePath(), op, concatSrcs, bufferSize);
+ path.getAbsolutePath(), op, concatSrcs, bufferSize,
+ excludeDatanodes);
} finally {
reset();
}
@@ -669,15 +704,17 @@ public class NamenodeWebHdfsMethods {
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
switch(op.getValue()) {
case APPEND:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, -1L,
+ excludeDatanodes.getValue(), bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CONCAT:
@@ -715,10 +752,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
- renewer, bufferSize, xattrNames, xattrEncoding);
+ renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes);
}
/** Handle HTTP GET request. */
@@ -747,11 +786,13 @@ public class NamenodeWebHdfsMethods {
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
- renewer, bufferSize, xattrEncoding);
+ renewer, bufferSize, xattrEncoding, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -759,7 +800,7 @@ public class NamenodeWebHdfsMethods {
try {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
- xattrNames, xattrEncoding);
+ xattrNames, xattrEncoding, excludeDatanodes);
} finally {
reset();
}
@@ -779,7 +820,8 @@ public class NamenodeWebHdfsMethods {
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List<XAttrNameParam> xattrNames,
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
@@ -787,8 +829,9 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case OPEN:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
+ excludeDatanodes.getValue(), offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
@@ -824,7 +867,7 @@ public class NamenodeWebHdfsMethods {
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L);
+ fullpath, op.getValue(), -1L, -1L, null);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1611751&r1=1611750&r2=1611751&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Fri Jul 18 18:21:18 2014
@@ -447,6 +447,7 @@ public class WebHdfsFileSystem extends F
protected final HttpOpParam.Op op;
private final boolean redirected;
+ protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
private boolean checkRetry;
@@ -498,6 +499,10 @@ public class WebHdfsFileSystem extends F
* a DN such as open and checksum
*/
private HttpURLConnection connect(URL url) throws IOException {
+ //redirect hostname and port
+ String redirectHost = null;
+
+
// resolve redirects for a DN operation unless already resolved
if (op.getRedirect() && !redirected) {
final HttpOpParam.Op redirectOp =
@@ -510,11 +515,24 @@ public class WebHdfsFileSystem extends F
try {
validateResponse(redirectOp, conn, false);
url = new URL(conn.getHeaderField("Location"));
+ redirectHost = url.getHost() + ":" + url.getPort();
} finally {
conn.disconnect();
}
}
- return connect(op, url);
+ try {
+ return connect(op, url);
+ } catch (IOException ioe) {
+ if (redirectHost != null) {
+ if (excludeDatanodes.getValue() != null) {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ + excludeDatanodes.getValue());
+ } else {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+ }
+ }
+ throw ioe;
+ }
}
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
@@ -651,7 +669,14 @@ public class WebHdfsFileSystem extends F
@Override
protected URL getUrl() throws IOException {
- return toUrl(op, fspath, parameters);
+ if (excludeDatanodes.getValue() != null) {
+ Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
+ System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+ tmpParam[parameters.length] = excludeDatanodes;
+ return toUrl(op, fspath, tmpParam);
+ } else {
+ return toUrl(op, fspath, parameters);
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java?rev=1611751&r1=1611750&r2=1611751&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java Fri Jul 18 18:21:18 2014
@@ -92,7 +92,7 @@ public class TestWebHdfsDataLocality {
//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
+ namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
Assert.assertEquals(ipAddr, chosen.getIpAddr());
}
}
@@ -117,23 +117,104 @@ public class TestWebHdfsDataLocality {
{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
+ namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
Assert.assertEquals(expected, chosen);
}
{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
+ namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
Assert.assertEquals(expected, chosen);
}
{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
+ namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
Assert.assertEquals(expected, chosen);
}
} finally {
cluster.shutdown();
}
}
+
+ @Test
+ public void testExcludeDataNodes() throws Exception {
+ final Configuration conf = WebHdfsTestUtil.createConf();
+ final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
+ final String[] hosts = {"DataNode1", "DataNode2", "DataNode3","DataNode4","DataNode5","DataNode6"};
+ final int nDataNodes = hosts.length;
+ LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks)
+ + ", hosts=" + Arrays.asList(hosts));
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .hosts(hosts).numDataNodes(nDataNodes).racks(racks).build();
+
+ try {
+ cluster.waitActive();
+
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final NameNode namenode = cluster.getNameNode();
+ final DatanodeManager dm = namenode.getNamesystem().getBlockManager(
+ ).getDatanodeManager();
+ LOG.info("dm=" + dm);
+
+ final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+ final String f = "/foo";
+
+ //create a file with three replica.
+ final Path p = new Path(f);
+ final FSDataOutputStream out = dfs.create(p, (short)3);
+ out.write(1);
+ out.close();
+
+ //get replica location.
+ final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
+ namenode, f, 0, 1);
+ final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
+ Assert.assertEquals(1, lb.size());
+ final DatanodeInfo[] locations = lb.get(0).getLocations();
+ Assert.assertEquals(3, locations.length);
+
+
+ //For GETFILECHECKSUM, OPEN and APPEND,
+ //the chosen datanode must be different with exclude nodes.
+
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < 2; i++) {
+ sb.append(locations[i].getXferAddr());
+ { // test GETFILECHECKSUM
+ final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+ namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
+ sb.toString());
+ for (int j = 0; j <= i; j++) {
+ Assert.assertNotEquals(locations[j].getHostName(),
+ chosen.getHostName());
+ }
+ }
+
+ { // test OPEN
+ final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+ namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
+ for (int j = 0; j <= i; j++) {
+ Assert.assertNotEquals(locations[j].getHostName(),
+ chosen.getHostName());
+ }
+ }
+
+ { // test APPEND
+ final DatanodeInfo chosen = NamenodeWebHdfsMethods
+ .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
+ blocksize, sb.toString());
+ for (int j = 0; j <= i; j++) {
+ Assert.assertNotEquals(locations[j].getHostName(),
+ chosen.getHostName());
+ }
+ }
+
+ sb.append(",");
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
\ No newline at end of file