You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/09/30 21:37:35 UTC
svn commit: r1527741 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
Author: brandonli
Date: Mon Sep 30 19:37:33 2013
New Revision: 1527741
URL: http://svn.apache.org/r1527741
Log:
HDFS-5230. Merging change r1527726 from trunk
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Mon Sep 30 19:37:33 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHa
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* RPC program corresponding to mountd daemon. See {@link Mountd}.
@@ -77,7 +83,7 @@ public class RpcProgramMountd extends Rp
throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
- PROGRAM, VERSION_1, VERSION_3, 0);
+ PROGRAM, VERSION_1, VERSION_3);
this.hostsMatcher = NfsExports.getInstance(config);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@@ -173,10 +179,16 @@ public class RpcProgramMountd extends Rp
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR xdr = new XDR(data);
+ XDR out = new XDR();
+ InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
+
if (mntproc == MNTPROC.NULL) {
out = nullOp(out, xid, client);
} else if (mntproc == MNTPROC.MNT) {
@@ -198,7 +210,9 @@ public class RpcProgramMountd extends Rp
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out);
}
- return out;
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Mon Sep 30 19:37:33 2013
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
@@ -103,9 +104,13 @@ import org.apache.hadoop.nfs.nfs3.respon
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcCallCache;
import org.apache.hadoop.oncrpc.RpcDeniedReply;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Credentials;
import org.apache.hadoop.oncrpc.security.CredentialsSys;
@@ -115,7 +120,10 @@ import org.apache.hadoop.oncrpc.security
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.AccessControlException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
@@ -150,14 +158,15 @@ public class RpcProgramNfs3 extends RpcP
private Statistics statistics;
private String writeDumpDir; // The dir save dump files
+ private final RpcCallCache rpcCallCache;
+
public RpcProgramNfs3() throws IOException {
this(new Configuration());
}
- public RpcProgramNfs3(Configuration config)
- throws IOException {
+ public RpcProgramNfs3(Configuration config) throws IOException {
super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
- Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
+ Nfs3Constant.VERSION, Nfs3Constant.VERSION);
config.set(FsPermission.UMASK_LABEL, "000");
iug = new IdUserGroup();
@@ -183,6 +192,8 @@ public class RpcProgramNfs3 extends RpcP
} else {
clearDirectory(writeDumpDir);
}
+
+ rpcCallCache = new RpcCallCache("NFS3", 256);
}
private void clearDirectory(String writeDumpDir) throws IOException {
@@ -213,8 +224,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public GETATTR3Response getattr(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -294,8 +305,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public SETATTR3Response setattr(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -370,8 +381,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public LOOKUP3Response lookup(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -432,8 +443,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public ACCESS3Response access(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -574,7 +585,6 @@ public class RpcProgramNfs3 extends RpcP
long offset = request.getOffset();
int count = request.getCount();
-
FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@@ -720,8 +730,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public CREATE3Response create(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -973,8 +983,7 @@ public class RpcProgramNfs3 extends RpcP
}
String fileIdPath = dirFileIdPath + "/" + fileName;
- HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
- fileIdPath);
+ HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
if (fstat == null) {
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
@@ -1056,8 +1065,7 @@ public class RpcProgramNfs3 extends RpcP
}
String fileIdPath = dirFileIdPath + "/" + fileName;
- HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
- fileIdPath);
+ HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
if (fstat == null) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
}
@@ -1098,8 +1106,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public RENAME3Response rename(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -1245,13 +1253,14 @@ public class RpcProgramNfs3 extends RpcP
}
}
- public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
+ public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
@Override
- public READDIR3Response readdir(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1540,8 +1549,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public FSSTAT3Response fsstat(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1598,8 +1607,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public FSINFO3Response fsinfo(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1650,8 +1659,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public PATHCONF3Response pathconf(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1697,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public COMMIT3Response commit(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -1776,25 +1785,53 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR xdr = new XDR(data);
+ XDR out = new XDR();
+ InetAddress client = ((InetSocketAddress) info.remoteAddress())
+ .getAddress();
+ Channel channel = info.channel();
Credentials credentials = rpcCall.getCredential();
// Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
if (nfsproc3 != NFSPROC3.NULL) {
- if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
- && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
- LOG.info("Wrong RPC AUTH flavor, "
- + rpcCall.getCredential().getFlavor()
+ if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
+ && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
+ LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
+ " is not AUTH_SYS or RPCSEC_GSS.");
XDR reply = new XDR();
RpcDeniedReply rdr = new RpcDeniedReply(xid,
RpcReply.ReplyState.MSG_ACCEPTED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
rdr.write(reply);
- return reply;
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
+ }
+ }
+
+ if (!isIdempotent(rpcCall)) {
+ RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
+ xid);
+ if (entry != null) { // in cache
+ if (entry.isCompleted()) {
+ LOG.info("Sending the cached reply to retransmitted request " + xid);
+ RpcUtil.sendRpcResponse(ctx, entry.getResponse());
+ return;
+ } else { // else request is in progress
+ LOG.info("Retransmitted request, transaction still in progress "
+ + xid);
+ // Ignore the request and do nothing
+ return;
+ }
}
}
@@ -1862,12 +1899,24 @@ public class RpcProgramNfs3 extends RpcP
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out);
}
- if (response != null) {
- // TODO: currently we just return VerifierNone
- out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+ if (response == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No sync response, expect an async response for request XID="
+ + rpcCall.getXid());
+ }
+ return;
+ }
+ // TODO: currently we just return VerifierNone
+ out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+
+ if (!isIdempotent(rpcCall)) {
+ rpcCallCache.callCompleted(client, xid, rsp);
}
- return out;
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 30 19:37:33 2013
@@ -101,6 +101,9 @@ Release 2.1.2 - UNRELEASED
NEW FEATURES
+ HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API.
+ (Haohui Mai via brandonli)
+
IMPROVEMENTS
HDFS-5246. Make Hadoop nfs server port and mount daemon port