You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/10/12 20:13:45 UTC
hadoop git commit: HDFS-10789. Route webhdfs through the RPC call
queue. Contributed by Daryn Sharp and Rushabh S Shah.
Repository: hadoop
Updated Branches:
refs/heads/trunk 6476934ae -> 85cd06f66
HDFS-10789. Route webhdfs through the RPC call queue. Contributed by Daryn Sharp and Rushabh S Shah.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85cd06f6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85cd06f6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85cd06f6
Branch: refs/heads/trunk
Commit: 85cd06f6636f295ad1f3bf2a90063f4714c9cca7
Parents: 6476934
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Oct 12 15:11:42 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Wed Oct 12 15:11:42 2016 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/ipc/ExternalCall.java | 9 +-
.../java/org/apache/hadoop/ipc/TestRPC.java | 6 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +
.../hdfs/server/namenode/FSNamesystem.java | 15 +-
.../hadoop/hdfs/server/namenode/NameNode.java | 12 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 6 +-
.../web/resources/NamenodeWebHdfsMethods.java | 150 +++++++++++--------
.../src/main/resources/hdfs-default.xml | 7 +
.../server/namenode/TestNamenodeRetryCache.java | 25 +++-
.../web/resources/TestWebHdfsDataLocality.java | 25 +++-
10 files changed, 160 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
index 9b4cbcf..5566136 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ipc.Server.Call;
@@ -37,14 +38,10 @@ public abstract class ExternalCall<T> extends Call {
public abstract UserGroupInformation getRemoteUser();
- public final T get() throws IOException, InterruptedException {
+ public final T get() throws InterruptedException, ExecutionException {
waitForCompletion();
if (error != null) {
- if (error instanceof IOException) {
- throw (IOException)error;
- } else {
- throw new IOException(error);
- }
+ throw new ExecutionException(error);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 92d9183..72b603a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -72,6 +72,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -989,8 +990,9 @@ public class TestRPC extends TestRpcBase {
try {
exceptionCall.get();
fail("didn't throw");
- } catch (IOException ioe) {
- assertEquals(expectedIOE.getMessage(), ioe.getMessage());
+ } catch (ExecutionException ee) {
+ assertTrue((ee.getCause()) instanceof IOException);
+ assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage());
}
} finally {
server.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 18209ae..10c0ad6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -70,6 +70,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.webhdfs.ugi.expire.after.access";
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
10*60*1000; //10 minutes
+ public static final String DFS_WEBHDFS_USE_IPC_CALLQ =
+ "dfs.webhdfs.use.ipc.callq";
+ public static final boolean DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT = true;
// HA related configuration
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b9b02ef..8c59186 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -242,7 +242,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -338,7 +337,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, HdfsFileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) {
- logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
+ logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
cmd, src, dst, stat);
}
}
@@ -5262,17 +5261,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* RPC call context even if the client exits.
*/
boolean isExternalInvocation() {
- return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
+ return Server.isRpcInvocation();
}
- private static InetAddress getRemoteIp() {
- InetAddress ip = Server.getRemoteIp();
- if (ip != null) {
- return ip;
- }
- return NamenodeWebHdfsMethods.getRemoteIp();
- }
-
// optimize ugi lookup for RPC operations to avoid a trip through
// UGI.getCurrentUser which is synch'ed
private static UserGroupInformation getRemoteUser() throws IOException {
@@ -6918,7 +6909,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
sb.append(trackingId);
}
sb.append("\t").append("proto=");
- sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
+ sb.append(Server.getProtocol());
if (isCallerContextEnabled &&
callerContext != null &&
callerContext.isContextValid()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index ae7a937..afedbb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -64,7 +64,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -407,7 +409,15 @@ public class NameNode extends ReconfigurableBase implements
public NamenodeProtocols getRpcServer() {
return rpcServer;
}
-
+
+ public void queueExternalCall(ExternalCall<?> extCall)
+ throws IOException, InterruptedException {
+ if (rpcServer == null) {
+ throw new RetriableException("Namenode is in startup mode");
+ }
+ rpcServer.getClientRpcServer().queueCall(extCall);
+ }
+
public static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57f7cb1..a97a307 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -139,7 +139,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1686,10 +1685,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
private static String getClientMachine() {
- String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
- if (clientMachine == null) { //not a web client
- clientMachine = Server.getRemoteAddress();
- }
+ String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index 3ab0c67..4887e35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -25,10 +25,13 @@ import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@@ -60,6 +63,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -81,8 +85,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
@@ -103,39 +107,39 @@ public class NamenodeWebHdfsMethods {
public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
private static final UriFsPathParam ROOT = new UriFsPathParam("");
-
- private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>();
-
- /** @return the remote client address. */
- public static String getRemoteAddress() {
- return REMOTE_ADDRESS.get();
- }
-
- public static InetAddress getRemoteIp() {
- try {
- return InetAddress.getByName(getRemoteAddress());
- } catch (Exception e) {
- return null;
- }
- }
- /**
- * Returns true if a WebHdfs request is in progress. Akin to
- * {@link Server#isRpcInvocation()}.
- */
- public static boolean isWebHdfsInvocation() {
- return getRemoteAddress() != null;
- }
+ private volatile Boolean useIpcCallq;
+ private String scheme;
+ private Principal userPrincipal;
+ private String remoteAddr;
private @Context ServletContext context;
- private @Context HttpServletRequest request;
private @Context HttpServletResponse response;
+ public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
+ // the request object is a proxy to thread-locals so we have to extract
+ // what we want from it since the external call will be processed in a
+ // different thread.
+ scheme = request.getScheme();
+ userPrincipal = request.getUserPrincipal();
+ // get the remote address, if coming in via a trusted proxy server then
+ // the address with be that of the proxied client
+ remoteAddr = JspHelper.getRemoteAddr(request);
+ }
+
private void init(final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) {
+ if (useIpcCallq == null) {
+ Configuration conf =
+ (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
+ useIpcCallq = conf.getBoolean(
+ DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ,
+ DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT);
+ }
+
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + ", " + username + ", " + doAsUser
@@ -144,16 +148,8 @@ public class NamenodeWebHdfsMethods {
//clear content type
response.setContentType(null);
-
- // set the remote address, if coming in via a trust proxy server then
- // the address with be that of the proxied client
- REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
- private void reset() {
- REMOTE_ADDRESS.set(null);
- }
-
private static NamenodeProtocols getRPCServer(NameNode namenode)
throws IOException {
final NamenodeProtocols np = namenode.getRpcServer();
@@ -162,11 +158,63 @@ public class NamenodeWebHdfsMethods {
}
return np;
}
-
+
+ private <T> T doAs(final UserGroupInformation ugi,
+ final PrivilegedExceptionAction<T> action)
+ throws IOException, InterruptedException {
+ return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action);
+ }
+
+ private <T> T doAsExternalCall(final UserGroupInformation ugi,
+ final PrivilegedExceptionAction<T> action)
+ throws IOException, InterruptedException {
+ // set the remote address, if coming in via a trust proxy server then
+ // the address with be that of the proxied client
+ ExternalCall<T> call = new ExternalCall<T>(action){
+ @Override
+ public UserGroupInformation getRemoteUser() {
+ return ugi;
+ }
+ @Override
+ public String getProtocol() {
+ return "webhdfs";
+ }
+ @Override
+ public String getHostAddress() {
+ return remoteAddr;
+ }
+ @Override
+ public InetAddress getHostInetAddress() {
+ try {
+ return InetAddress.getByName(getHostAddress());
+ } catch (UnknownHostException e) {
+ return null;
+ }
+ }
+ };
+ final NameNode namenode = (NameNode)context.getAttribute("name.node");
+ namenode.queueExternalCall(call);
+ T result = null;
+ try {
+ result = call.get();
+ } catch (ExecutionException ee) {
+ Throwable t = ee.getCause();
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException)t;
+ } else if (t instanceof IOException) {
+ throw (IOException)t;
+ } else {
+ throw new IOException(t);
+ }
+ }
+ return result;
+ }
+
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize, final String excludeDatanodes) throws IOException {
+ final long blocksize, final String excludeDatanodes,
+ final String remoteAddr) throws IOException {
FSNamesystem fsn = namenode.getNamesystem();
if (fsn == null) {
throw new IOException("Namesystem has not been intialized yet.");
@@ -190,7 +238,7 @@ public class NamenodeWebHdfsMethods {
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
- ).getDatanodeByHost(getRemoteAddress());
+ ).getDatanodeByHost(remoteAddr);
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize);
@@ -253,7 +301,8 @@ public class NamenodeWebHdfsMethods {
return null;
}
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
- Text kind = request.getScheme().equals("http") ? WebHdfsConstants.WEBHDFS_TOKEN_KIND
+ Text kind = scheme.equals("http")
+ ? WebHdfsConstants.WEBHDFS_TOKEN_KIND
: WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
t.setKind(kind);
return t;
@@ -267,7 +316,7 @@ public class NamenodeWebHdfsMethods {
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
- excludeDatanodes);
+ excludeDatanodes, remoteAddr);
if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes);
@@ -283,7 +332,7 @@ public class NamenodeWebHdfsMethods {
} else {
//generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken(
- namenode, ugi, request.getUserPrincipal().getName());
+ namenode, ugi, userPrincipal.getName());
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
}
final String query = op.toQueryString() + delegationQuery
@@ -291,7 +340,6 @@ public class NamenodeWebHdfsMethods {
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
- final String scheme = request.getScheme();
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
.getInfoSecurePort();
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
@@ -446,10 +494,9 @@ public class NamenodeWebHdfsMethods {
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect);
- return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+ return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
- try {
return put(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, destination, owner, group,
permission, unmaskedPermission, overwrite, bufferSize,
@@ -458,9 +505,6 @@ public class NamenodeWebHdfsMethods {
aclPermission, xattrName, xattrValue, xattrSetFlag,
snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect);
- } finally {
- reset();
- }
}
});
}
@@ -703,16 +747,12 @@ public class NamenodeWebHdfsMethods {
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
excludeDatanodes, newLength);
- return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+ return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
- try {
return post(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, concatSrcs, bufferSize,
excludeDatanodes, newLength, noredirect);
- } finally {
- reset();
- }
}
});
}
@@ -858,17 +898,13 @@ public class NamenodeWebHdfsMethods {
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService, startAfter);
- return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+ return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
- try {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
tokenService, noredirect, startAfter);
- } finally {
- reset();
- }
}
});
}
@@ -1138,15 +1174,11 @@ public class NamenodeWebHdfsMethods {
init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
- return ugi.doAs(new PrivilegedExceptionAction<Response>() {
+ return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException {
- try {
return delete(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, recursive, snapshotName);
- } finally {
- reset();
- }
}
});
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index db4035d..84b51f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4281,4 +4281,11 @@
</description>
</property>
+ <property>
+ <name>dfs.webhdfs.use.ipc.callq</name>
+ <value>true</value>
+ <description>Enables routing of webhdfs calls through rpc
+ call queue</description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
index 26efce5..d7a2c81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
@@ -111,19 +112,33 @@ public class TestNamenodeRetryCache {
}
}
+ static class DummyCall extends Server.Call {
+ private UserGroupInformation ugi;
+
+ DummyCall(int callId, byte[] clientId) {
+ super(callId, 1, null, null, RpcKind.RPC_PROTOCOL_BUFFER, clientId);
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ }
+ }
+ @Override
+ public UserGroupInformation getRemoteUser() {
+ return ugi;
+ }
+ }
/** Set the current Server RPC call */
public static void newCall() {
- Server.Call call = new Server.Call(++callId, 1, null, null,
- RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
+ Server.Call call = new DummyCall(++callId, CLIENT_ID);
Server.getCurCall().set(call);
}
public static void resetCall() {
- Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
- null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
+ Server.Call call = new DummyCall(RpcConstants.INVALID_CALL_ID,
+ RpcConstants.DUMMY_CLIENT_ID);
Server.getCurCall().set(call);
}
-
+
private void concatSetup(String file1, String file2) throws Exception {
DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 0L);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85cd06f6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
index 15e1c04..604bf79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.web.resources;
import static org.mockito.Mockito.*;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
@@ -62,6 +63,9 @@ public class TestWebHdfsDataLocality {
private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2";
+ private static final String LOCALHOST =
+ InetAddress.getLoopbackAddress().getHostName();
+
@Rule
public final ExpectedException exception = ExpectedException.none();
@@ -96,7 +100,8 @@ public class TestWebHdfsDataLocality {
//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
+ namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
+ LOCALHOST);
Assert.assertEquals(ipAddr, chosen.getIpAddr());
}
}
@@ -121,19 +126,22 @@ public class TestWebHdfsDataLocality {
{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
+ namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
+ LOCALHOST);
Assert.assertEquals(expected, chosen);
}
{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
+ namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
+ LOCALHOST);
Assert.assertEquals(expected, chosen);
}
{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
+ namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
+ LOCALHOST);
Assert.assertEquals(expected, chosen);
}
} finally {
@@ -189,7 +197,7 @@ public class TestWebHdfsDataLocality {
{ // test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
- sb.toString());
+ sb.toString(), LOCALHOST);
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
@@ -198,7 +206,8 @@ public class TestWebHdfsDataLocality {
{ // test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
+ namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
+ LOCALHOST);
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
@@ -208,7 +217,7 @@ public class TestWebHdfsDataLocality {
{ // test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
- blocksize, sb.toString());
+ blocksize, sb.toString(), LOCALHOST);
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
@@ -229,6 +238,6 @@ public class TestWebHdfsDataLocality {
exception.expect(IOException.class);
exception.expectMessage("Namesystem has not been intialized yet.");
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
- DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
+ DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org