You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/10/17 07:33:01 UTC
svn commit: r1532967 [2/2] - in
/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/
hadoop-hdfs-nfs/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/src/m...
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Oct 17 05:32:42 2013
@@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
@@ -28,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FileUtil;
@@ -42,8 +45,10 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.nfs.AccessPrivilege;
import org.apache.hadoop.nfs.NfsExports;
+import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.NfsTime;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -65,10 +70,12 @@ import org.apache.hadoop.nfs.nfs3.reques
import org.apache.hadoop.nfs.nfs3.request.READ3Request;
import org.apache.hadoop.nfs.nfs3.request.READDIR3Request;
import org.apache.hadoop.nfs.nfs3.request.READDIRPLUS3Request;
+import org.apache.hadoop.nfs.nfs3.request.READLINK3Request;
import org.apache.hadoop.nfs.nfs3.request.REMOVE3Request;
import org.apache.hadoop.nfs.nfs3.request.RENAME3Request;
import org.apache.hadoop.nfs.nfs3.request.RMDIR3Request;
import org.apache.hadoop.nfs.nfs3.request.SETATTR3Request;
+import org.apache.hadoop.nfs.nfs3.request.SYMLINK3Request;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3.SetAttrField;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@@ -94,24 +101,31 @@ import org.apache.hadoop.nfs.nfs3.respon
import org.apache.hadoop.nfs.nfs3.response.RMDIR3Response;
import org.apache.hadoop.nfs.nfs3.response.SETATTR3Response;
import org.apache.hadoop.nfs.nfs3.response.SYMLINK3Response;
-import org.apache.hadoop.nfs.nfs3.response.VoidResponse;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcCallCache;
import org.apache.hadoop.oncrpc.RpcDeniedReply;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.CredentialsSys;
import org.apache.hadoop.oncrpc.security.Credentials;
-import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.CredentialsSys;
+import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.AccessControlException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
@@ -121,7 +135,7 @@ public class RpcProgramNfs3 extends RpcP
public static final FsPermission umask = new FsPermission(
(short) DEFAULT_UMASK);
- private static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
+ static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
private static final int MAX_READ_TRANSFER_SIZE = 64 * 1024;
private static final int MAX_WRITE_TRANSFER_SIZE = 64 * 1024;
private static final int MAX_READDIR_TRANSFER_SIZE = 64 * 1024;
@@ -146,14 +160,15 @@ public class RpcProgramNfs3 extends RpcP
private Statistics statistics;
private String writeDumpDir; // The dir save dump files
+ private final RpcCallCache rpcCallCache;
+
public RpcProgramNfs3() throws IOException {
this(new Configuration());
}
- public RpcProgramNfs3(Configuration config)
- throws IOException {
+ public RpcProgramNfs3(Configuration config) throws IOException {
super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
- Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
+ Nfs3Constant.VERSION, Nfs3Constant.VERSION);
config.set(FsPermission.UMASK_LABEL, "000");
iug = new IdUserGroup();
@@ -179,6 +194,8 @@ public class RpcProgramNfs3 extends RpcP
} else {
clearDirectory(writeDumpDir);
}
+
+ rpcCallCache = new RpcCallCache("NFS3", 256);
}
private void clearDirectory(String writeDumpDir) throws IOException {
@@ -205,12 +222,12 @@ public class RpcProgramNfs3 extends RpcP
if (LOG.isDebugEnabled()) {
LOG.debug("NFS NULL");
}
- return new VoidResponse(Nfs3Status.NFS3_OK);
+ return new NFS3Response(Nfs3Status.NFS3_OK);
}
@Override
- public GETATTR3Response getattr(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -290,8 +307,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public SETATTR3Response setattr(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -366,8 +383,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public LOOKUP3Response lookup(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -428,8 +445,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public ACCESS3Response access(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -476,9 +493,70 @@ public class RpcProgramNfs3 extends RpcP
}
}
- public READLINK3Response readlink(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
- return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
+ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
+ READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
+
+ if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+ response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+ return response;
+ }
+
+ DFSClient dfsClient = clientCache.get(securityHandler.getUser());
+ if (dfsClient == null) {
+ response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
+ return response;
+ }
+
+ READLINK3Request request = null;
+
+ try {
+ request = new READLINK3Request(xdr);
+ } catch (IOException e) {
+ LOG.error("Invalid READLINK request");
+ return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
+ }
+
+ FileHandle handle = request.getHandle();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NFS READLINK fileId: " + handle.getFileId());
+ }
+
+ String fileIdPath = Nfs3Utils.getFileIdPath(handle);
+ try {
+ String target = dfsClient.getLinkTarget(fileIdPath);
+
+ Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient,
+ fileIdPath, iug);
+ if (postOpAttr == null) {
+ LOG.info("Can't get path for fileId:" + handle.getFileId());
+ return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
+ }
+ if (postOpAttr.getType() != NfsFileType.NFSLNK.toValue()) {
+ LOG.error("Not a symlink, fileId:" + handle.getFileId());
+ return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
+ }
+ if (target == null) {
+ LOG.error("Symlink target should not be null, fileId:"
+ + handle.getFileId());
+ return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
+ }
+ if (MAX_READ_TRANSFER_SIZE < target.getBytes().length) {
+ return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr, null);
+ }
+
+ return new READLINK3Response(Nfs3Status.NFS3_OK, postOpAttr,
+ target.getBytes());
+
+ } catch (IOException e) {
+ LOG.warn("Readlink error: " + e.getClass(), e);
+ if (e instanceof FileNotFoundException) {
+ return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
+ } else if (e instanceof AccessControlException) {
+ return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
+ }
+ return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+ }
}
@Override
@@ -509,7 +587,6 @@ public class RpcProgramNfs3 extends RpcP
long offset = request.getOffset();
int count = request.getCount();
-
FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@@ -655,8 +732,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public CREATE3Response create(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -765,7 +842,7 @@ public class RpcProgramNfs3 extends RpcP
// Add open stream
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
- + "/" + postOpObjAttr.getFileId());
+ + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
fileHandle = new FileHandle(postOpObjAttr.getFileId());
writeManager.addOpenFileStream(fileHandle, openFileCtx);
if (LOG.isDebugEnabled()) {
@@ -908,8 +985,7 @@ public class RpcProgramNfs3 extends RpcP
}
String fileIdPath = dirFileIdPath + "/" + fileName;
- HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
- fileIdPath);
+ HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
if (fstat == null) {
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
@@ -991,8 +1067,7 @@ public class RpcProgramNfs3 extends RpcP
}
String fileIdPath = dirFileIdPath + "/" + fileName;
- HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
- fileIdPath);
+ HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
if (fstat == null) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
}
@@ -1033,8 +1108,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public RENAME3Response rename(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
@@ -1121,18 +1196,96 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public SYMLINK3Response symlink(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
- return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
+ public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
+ SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
+
+ if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+ response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+ return response;
+ }
+
+ DFSClient dfsClient = clientCache.get(securityHandler.getUser());
+ if (dfsClient == null) {
+ response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
+ return response;
+ }
+
+ SYMLINK3Request request = null;
+ try {
+ request = new SYMLINK3Request(xdr);
+ } catch (IOException e) {
+ LOG.error("Invalid SYMLINK request");
+ response.setStatus(Nfs3Status.NFS3ERR_INVAL);
+ return response;
+ }
+
+ FileHandle dirHandle = request.getHandle();
+ String name = request.getName();
+ String symData = request.getSymData();
+ String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle);
+ // Don't do any name check to source path, just leave it to HDFS
+ String linkIdPath = linkDirIdPath + "/" + name;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath);
+ }
+
+ try {
+ WccData dirWcc = response.getDirWcc();
+ WccAttr preOpAttr = Nfs3Utils.getWccAttr(dfsClient, linkDirIdPath);
+ dirWcc.setPreOpAttr(preOpAttr);
+
+ dfsClient.createSymlink(symData, linkIdPath, false);
+ // Set symlink attr is considered as to change the attr of the target
+ // file. So no need to set symlink attr here after it's created.
+
+ HdfsFileStatus linkstat = dfsClient.getFileLinkInfo(linkIdPath);
+ Nfs3FileAttributes objAttr = Nfs3Utils.getNfs3FileAttrFromFileStatus(
+ linkstat, iug);
+ dirWcc
+ .setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug));
+
+ return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle(
+ objAttr.getFileid()), objAttr, dirWcc);
+
+ } catch (IOException e) {
+ LOG.warn("Exception:" + e);
+ response.setStatus(Nfs3Status.NFS3ERR_IO);
+ return response;
+ }
}
- public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
+ public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
+ /**
+ * Used by readdir and readdirplus to get dirents. It retries the listing if
+ * the startAfter can't be found anymore.
+ */
+ private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
+ byte[] startAfter) throws IOException {
+ DirectoryListing dlisting = null;
+ try {
+ dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
+ } catch (RemoteException e) {
+ IOException io = e.unwrapRemoteException();
+ if (!(io instanceof DirectoryListingStartAfterNotFoundException)) {
+ throw io;
+ }
+ // This happens when startAfter was just deleted
+ LOG.info("Cookie cound't be found: " + new String(startAfter)
+ + ", do listing from beginning");
+ dlisting = dfsClient
+ .listPaths(dirFileIdPath, HdfsFileStatus.EMPTY_NAME);
+ }
+ return dlisting;
+ }
+
@Override
- public READDIR3Response readdir(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1170,7 +1323,7 @@ public class RpcProgramNfs3 extends RpcP
+ cookie + " count: " + count);
}
- HdfsFileStatus dirStatus;
+ HdfsFileStatus dirStatus = null;
DirectoryListing dlisting = null;
Nfs3FileAttributes postOpAttr = null;
long dotdotFileId = 0;
@@ -1214,8 +1367,8 @@ public class RpcProgramNfs3 extends RpcP
String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
startAfter = inodeIdPath.getBytes();
}
- dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+
+ dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (postOpAttr == null) {
LOG.error("Can't get path for fileId:" + handle.getFileId());
@@ -1298,11 +1451,15 @@ public class RpcProgramNfs3 extends RpcP
}
long dirCount = request.getDirCount();
if (dirCount <= 0) {
- LOG.info("Nonpositive count in invalid READDIRPLUS request:" + dirCount);
- return new READDIRPLUS3Response(Nfs3Status.NFS3_OK);
+ LOG.info("Nonpositive dircount in invalid READDIRPLUS request:" + dirCount);
+ return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
}
int maxCount = request.getMaxCount();
-
+ if (maxCount <= 0) {
+ LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
+ return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
+ cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1352,8 +1509,8 @@ public class RpcProgramNfs3 extends RpcP
String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
startAfter = inodeIdPath.getBytes();
}
- dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+
+ dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (postOpDirAttr == null) {
LOG.info("Can't get path for fileId:" + handle.getFileId());
@@ -1421,8 +1578,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public FSSTAT3Response fsstat(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1479,8 +1636,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public FSINFO3Response fsinfo(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1531,8 +1688,8 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public PATHCONF3Response pathconf(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+ InetAddress client) {
PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1578,7 +1735,7 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public COMMIT3Response commit(XDR xdr,
+ public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
SecurityHandler securityHandler, InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
@@ -1620,18 +1777,10 @@ public class RpcProgramNfs3 extends RpcP
long commitOffset = (request.getCount() == 0) ? 0
: (request.getOffset() + request.getCount());
- int status;
- if (writeManager.handleCommit(handle, commitOffset)) {
- status = Nfs3Status.NFS3_OK;
- } else {
- status = Nfs3Status.NFS3ERR_IO;
- }
- Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient,
- handle, iug);
- WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
- return new COMMIT3Response(status, fileWcc,
- Nfs3Constant.WRITE_COMMIT_VERF);
-
+ // Insert commit as an async request
+ writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
+ preOpAttr);
+ return null;
} catch (IOException e) {
LOG.warn("Exception ", e);
Nfs3FileAttributes postOpAttr = null;
@@ -1657,24 +1806,53 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR xdr = new XDR(data);
+ XDR out = new XDR();
+ InetAddress client = ((InetSocketAddress) info.remoteAddress())
+ .getAddress();
+ Channel channel = info.channel();
Credentials credentials = rpcCall.getCredential();
// Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
if (nfsproc3 != NFSPROC3.NULL) {
- if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
- && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
- LOG.info("Wrong RPC AUTH flavor, "
- + rpcCall.getCredential().getFlavor()
+ if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
+ && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
+ LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
+ " is not AUTH_SYS or RPCSEC_GSS.");
XDR reply = new XDR();
- reply = RpcDeniedReply.voidReply(reply, xid,
+ RpcDeniedReply rdr = new RpcDeniedReply(xid,
RpcReply.ReplyState.MSG_ACCEPTED,
- RpcDeniedReply.RejectState.AUTH_ERROR);
- return reply;
+ RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
+ rdr.write(reply);
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
+ }
+ }
+
+ if (!isIdempotent(rpcCall)) {
+ RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
+ xid);
+ if (entry != null) { // in cache
+ if (entry.isCompleted()) {
+ LOG.info("Sending the cached reply to retransmitted request " + xid);
+ RpcUtil.sendRpcResponse(ctx, entry.getResponse());
+ return;
+ } else { // else request is in progress
+ LOG.info("Retransmitted request, transaction still in progress "
+ + xid);
+ // Ignore the request and do nothing
+ return;
+ }
}
}
@@ -1695,9 +1873,19 @@ public class RpcProgramNfs3 extends RpcP
} else if (nfsproc3 == NFSPROC3.READLINK) {
response = readlink(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.READ) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Nfs3Utils.READ_RPC_START + xid);
+ }
response = read(xdr, securityHandler, client);
+ if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
+ LOG.debug(Nfs3Utils.READ_RPC_END + xid);
+ }
} else if (nfsproc3 == NFSPROC3.WRITE) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
+ }
response = write(xdr, channel, xid, securityHandler, client);
+ // Write end debug trace is in Nfs3Utils.writeChannel
} else if (nfsproc3 == NFSPROC3.CREATE) {
response = create(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.MKDIR) {
@@ -1725,16 +1913,31 @@ public class RpcProgramNfs3 extends RpcP
} else if (nfsproc3 == NFSPROC3.PATHCONF) {
response = pathconf(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.COMMIT) {
- response = commit(xdr, securityHandler, client);
+ response = commit(xdr, channel, xid, securityHandler, client);
} else {
// Invalid procedure
- RpcAcceptedReply.voidReply(out, xid,
- RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+ RpcAcceptedReply.getInstance(xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
+ out);
+ }
+ if (response == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No sync response, expect an async response for request XID="
+ + rpcCall.getXid());
+ }
+ return;
}
- if (response != null) {
- out = response.send(out, xid);
+ // TODO: currently we just return VerifierNone
+ out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+
+ if (!isIdempotent(rpcCall)) {
+ rpcCallCache.callCompleted(client, xid, rsp);
}
- return out;
+
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Thu Oct 17 05:32:42 2013
@@ -20,13 +20,18 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* WriteCtx saves the context of one write request, such as request, channel,
* xid and reply status.
@@ -48,14 +53,31 @@ class WriteCtx {
private final FileHandle handle;
private final long offset;
private final int count;
+
+ //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()
+ private final int originalCount;
+ public static final int INVALID_ORIGINAL_COUNT = -1;
+
+ public int getOriginalCount() {
+ return originalCount;
+ }
+
private final WriteStableHow stableHow;
- private byte[] data;
+ private volatile ByteBuffer data;
private final Channel channel;
private final int xid;
private boolean replied;
- private DataState dataState;
+ /**
+ * Data belonging to the same {@link OpenFileCtx} may be dumped to a file.
+ * After being dumped to the file, the corresponding {@link WriteCtx} records
+ * the dump file and the offset.
+ */
+ private RandomAccessFile raf;
+ private long dumpFileOffset;
+
+ private volatile DataState dataState;
public DataState getDataState() {
return dataState;
@@ -64,12 +86,13 @@ class WriteCtx {
public void setDataState(DataState dataState) {
this.dataState = dataState;
}
-
- private RandomAccessFile raf;
- private long dumpFileOffset;
- // Return the dumped data size
- public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
+ /**
+ * Writing the data into a local file. After the writing, if
+ * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set
+ * {@link #dataState} to DUMPED.
+ */
+ long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
throws IOException {
if (dataState != DataState.ALLOW_DUMP) {
if (LOG.isTraceEnabled()) {
@@ -78,54 +101,104 @@ class WriteCtx {
}
return 0;
}
+
+ // Resized write should not allow dump
+ Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT);
+
this.raf = raf;
dumpFileOffset = dumpOut.getChannel().position();
- dumpOut.write(data, 0, count);
+ dumpOut.write(data.array(), 0, count);
if (LOG.isDebugEnabled()) {
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
}
- data = null;
- dataState = DataState.DUMPED;
- return count;
+ // it is possible that while we dump the data, the data is also being
+ // written back to HDFS. After dump, if the writing back has not finished
+ // yet, we change its flag to DUMPED and set the data to null. Otherwise
+ // this WriteCtx instance should have been removed from the buffer.
+ if (dataState == DataState.ALLOW_DUMP) {
+ synchronized (this) {
+ if (dataState == DataState.ALLOW_DUMP) {
+ data = null;
+ dataState = DataState.DUMPED;
+ return count;
+ }
+ }
+ }
+ return 0;
}
- public FileHandle getHandle() {
+ FileHandle getHandle() {
return handle;
}
- public long getOffset() {
+ long getOffset() {
return offset;
}
- public int getCount() {
+ int getCount() {
return count;
}
- public WriteStableHow getStableHow() {
+ WriteStableHow getStableHow() {
return stableHow;
}
- public byte[] getData() throws IOException {
+ @VisibleForTesting
+ ByteBuffer getData() throws IOException {
if (dataState != DataState.DUMPED) {
- if (data == null) {
- throw new IOException("Data is not dumpted but has null:" + this);
- }
- } else {
- // read back
- if (data != null) {
- throw new IOException("Data is dumpted but not null");
- }
- data = new byte[count];
- raf.seek(dumpFileOffset);
- int size = raf.read(data, 0, count);
- if (size != count) {
- throw new IOException("Data count is " + count + ", but read back "
- + size + "bytes");
+ synchronized (this) {
+ if (dataState != DataState.DUMPED) {
+ Preconditions.checkState(data != null);
+ return data;
+ }
}
}
+ // read back from dumped file
+ this.loadData();
return data;
}
+ private void loadData() throws IOException {
+ Preconditions.checkState(data == null);
+ byte[] rawData = new byte[count];
+ raf.seek(dumpFileOffset);
+ int size = raf.read(rawData, 0, count);
+ if (size != count) {
+ throw new IOException("Data count is " + count + ", but read back "
+ + size + "bytes");
+ }
+ data = ByteBuffer.wrap(rawData);
+ }
+
+ public void writeData(HdfsDataOutputStream fos) throws IOException {
+ Preconditions.checkState(fos != null);
+
+ ByteBuffer dataBuffer = null;
+ try {
+ dataBuffer = getData();
+ } catch (Exception e1) {
+ LOG.error("Failed to get request data offset:" + offset + " count:"
+ + count + " error:" + e1);
+ throw new IOException("Can't get WriteCtx.data");
+ }
+
+ byte[] data = dataBuffer.array();
+ int position = dataBuffer.position();
+ int limit = dataBuffer.limit();
+ Preconditions.checkState(limit - position == count);
+ // Modified write has a valid original count
+ if (position != 0) {
+ if (limit != getOriginalCount()) {
+ throw new IOException("Modified write has differnt original size."
+ + "buff position:" + position + " buff limit:" + limit + ". "
+ + toString());
+ }
+ }
+
+ // Now write data
+ fos.write(data, position, count);
+ }
+
Channel getChannel() {
return channel;
}
@@ -142,11 +215,13 @@ class WriteCtx {
this.replied = replied;
}
- WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
- byte[] data, Channel channel, int xid, boolean replied, DataState dataState) {
+ WriteCtx(FileHandle handle, long offset, int count, int originalCount,
+ WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid,
+ boolean replied, DataState dataState) {
this.handle = handle;
this.offset = offset;
this.count = count;
+ this.originalCount = originalCount;
this.stableHow = stableHow;
this.data = data;
this.channel = channel;
@@ -159,7 +234,7 @@ class WriteCtx {
@Override
public String toString() {
return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
- + " stableHow:" + stableHow + " replied:" + replied + " dataState:"
- + dataState + " xid:" + xid;
+ + " originalCount:" + originalCount + " stableHow:" + stableHow
+ + " replied:" + replied + " dataState:" + dataState + " xid:" + xid;
}
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Oct 17 05:32:42 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -36,9 +37,11 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
@@ -66,8 +69,8 @@ public class WriteManager {
*/
private long streamTimeout;
- public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
- public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
+ public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
+ public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
openFileMap.put(h, ctx);
@@ -118,7 +121,8 @@ public class WriteManager {
byte[] data = request.getData().array();
if (data.length < count) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+ new XDR(), xid, new VerifierNone()), xid);
return;
}
@@ -155,7 +159,8 @@ public class WriteManager {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+ new XDR(), xid, new VerifierNone()), xid);
return;
}
@@ -163,7 +168,7 @@ public class WriteManager {
String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
- + fileHandle.getFileId());
+ + fileHandle.getFileId(), dfsClient, iug);
addOpenFileStream(fileHandle, openFileCtx);
if (LOG.isDebugEnabled()) {
LOG.debug("opened stream for file:" + fileHandle.getFileId());
@@ -173,65 +178,55 @@ public class WriteManager {
// Add write into the async job queue
openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
asyncDataService, iug);
- // Block stable write
- if (request.getStableHow() != WriteStableHow.UNSTABLE) {
- if (handleCommit(fileHandle, offset + count)) {
- Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
- WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
- postOpAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, count, request.getStableHow(),
- Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
- } else {
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
- }
- }
-
return;
}
- boolean handleCommit(FileHandle fileHandle, long commitOffset) {
+ void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
+ long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+ int status;
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
- + " commitOffset=" + commitOffset);
- return true;
- }
- long timeout = 30 * 1000; // 30 seconds
- long startCommit = System.currentTimeMillis();
- while (true) {
- int ret = openFileCtx.checkCommit(commitOffset);
- if (ret == OpenFileCtx.COMMIT_FINISHED) {
- // Committed
- return true;
- } else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
- LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
- + " commitOffset=" + commitOffset);
- return true;
- }
- assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
- if (ret == OpenFileCtx.COMMIT_ERROR) {
- return false;
- }
+ + " commitOffset=" + commitOffset + ". Return success in this case.");
+ status = Nfs3Status.NFS3_OK;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
- + " commitOffset=" + commitOffset);
- }
- if (System.currentTimeMillis() - startCommit > timeout) {
- // Commit took too long, return error
- return false;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
- + " commitOffset=" + commitOffset);
- return false;
+ } else {
+ COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
+ channel, xid, preOpAttr);
+ switch (ret) {
+ case COMMIT_DO_SYNC:
+ case COMMIT_FINISHED:
+ case COMMIT_INACTIVE_CTX:
+ status = Nfs3Status.NFS3_OK;
+ break;
+ case COMMIT_INACTIVE_WITH_PENDING_WRITE:
+ case COMMIT_ERROR:
+ status = Nfs3Status.NFS3ERR_IO;
+ break;
+ case COMMIT_WAIT:
+ // Do nothing. Commit is async now.
+ return;
+ default:
+ throw new RuntimeException("Should not get commit return code:"
+ + ret.name());
}
- }// while
+ }
+
+ // Send out the response
+ Nfs3FileAttributes postOpAttr = null;
+ try {
+ String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileid());
+ postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+ } catch (IOException e1) {
+ LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileid());
+ }
+ WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
+ COMMIT3Response response = new COMMIT3Response(status, fileWcc,
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannelCommit(channel,
+ response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+ xid);
}
/**
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Thu Oct 17 05:32:42 2013
@@ -33,11 +33,13 @@ import org.apache.hadoop.nfs.nfs3.reques
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpClient;
import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -58,15 +60,9 @@ public class TestOutOfOrderWrite {
static XDR create() {
XDR request = new XDR();
- RpcCall.write(request, 0x8000004c, Nfs3Constant.PROGRAM,
- Nfs3Constant.VERSION, Nfs3Constant.NFSPROC3.CREATE.getValue());
-
- // credentials
- request.writeInt(0); // auth null
- request.writeInt(0); // length zero
- // verifier
- request.writeInt(0); // auth null
- request.writeInt(0); // length zero
+ RpcCall.getInstance(0x8000004c, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
+ Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(),
+ new VerifierNone()).write(request);
SetAttr3 objAttr = new SetAttr3();
CREATE3Request createReq = new CREATE3Request(new FileHandle("/"),
@@ -78,15 +74,10 @@ public class TestOutOfOrderWrite {
static XDR write(FileHandle handle, int xid, long offset, int count,
byte[] data) {
XDR request = new XDR();
- RpcCall.write(request, xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
- Nfs3Constant.NFSPROC3.WRITE.getValue());
+ RpcCall.getInstance(xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
+ Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(),
+ new VerifierNone()).write(request);
- // credentials
- request.writeInt(0); // auth null
- request.writeInt(0); // length zero
- // verifier
- request.writeInt(0); // auth null
- request.writeInt(0); // length zero
WRITE3Request write1 = new WRITE3Request(handle, offset, count,
WriteStableHow.UNSTABLE, ByteBuffer.wrap(data));
write1.serialize(request);
@@ -145,8 +136,9 @@ public class TestOutOfOrderWrite {
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
- return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler(
- request));
+ return Channels.pipeline(
+ RpcUtil.constructRpcFrameDecoder(),
+ new WriteHandler(request));
}
};
return this.pipelineFactory;
@@ -174,11 +166,11 @@ public class TestOutOfOrderWrite {
XDR writeReq;
writeReq = write(handle, 0x8000005c, 2000, 1000, data3);
- Nfs3Utils.writeChannel(channel, writeReq);
+ Nfs3Utils.writeChannel(channel, writeReq, 1);
writeReq = write(handle, 0x8000005d, 1000, 1000, data2);
- Nfs3Utils.writeChannel(channel, writeReq);
+ Nfs3Utils.writeChannel(channel, writeReq, 2);
writeReq = write(handle, 0x8000005e, 0, 1000, data1);
- Nfs3Utils.writeChannel(channel, writeReq);
+ Nfs3Utils.writeChannel(channel, writeReq, 3);
// TODO: convert to Junit test, and validate result automatically
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java Thu Oct 17 05:32:42 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
@@ -78,11 +80,8 @@ public class TestPortmapRegister {
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// TODO: Move this to RpcRequest
- RpcCall.write(xdr_out, 0, 100000, 2, procedure);
- xdr_out.writeInt(0); //no auth
- xdr_out.writeInt(0);
- xdr_out.writeInt(0);
- xdr_out.writeInt(0);
+ RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
+ new VerifierNone()).write(xdr_out);
/*
xdr_out.putInt(1); //unix auth
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java Thu Oct 17 05:32:42 2013
@@ -27,6 +27,8 @@ import java.net.UnknownHostException;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
// TODO: convert this to Junit
public class TestUdpServer {
@@ -82,7 +84,8 @@ public class TestUdpServer {
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// Make this a method
- RpcCall.write(xdr_out, 0, 100000, 2, procedure);
+ RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
+ new VerifierNone()).write(xdr_out);
}
static void testGetportMount() {
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Thu Oct 17 05:32:42 2013
@@ -17,41 +17,44 @@
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSClient;
import org.junit.Test;
-import org.mockito.Mockito;
public class TestDFSClientCache {
@Test
- public void testLruTable() throws IOException {
- DFSClientCache cache = new DFSClientCache(new Configuration(), 3);
- DFSClient client = Mockito.mock(DFSClient.class);
- cache.put("a", client);
- assertTrue(cache.containsKey("a"));
-
- cache.put("b", client);
- cache.put("c", client);
- cache.put("d", client);
- assertTrue(cache.usedSize() == 3);
- assertFalse(cache.containsKey("a"));
-
- // Cache should have d,c,b in LRU order
- assertTrue(cache.containsKey("b"));
- // Do a lookup to make b the most recently used
- assertTrue(cache.get("b") != null);
-
- cache.put("e", client);
- assertTrue(cache.usedSize() == 3);
- // c should be replaced with e, and cache has e,b,d
- assertFalse(cache.containsKey("c"));
- assertTrue(cache.containsKey("e"));
- assertTrue(cache.containsKey("b"));
- assertTrue(cache.containsKey("d"));
+ public void testEviction() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
+
+ // Only one entry will be in the cache
+ final int MAX_CACHE_SIZE = 2;
+
+ DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
+
+ DFSClient c1 = cache.get("test1");
+ assertTrue(cache.get("test1").toString().contains("ugi=test1"));
+ assertEquals(c1, cache.get("test1"));
+ assertFalse(isDfsClientClose(c1));
+
+ cache.get("test2");
+ assertTrue(isDfsClientClose(c1));
+ assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
+ }
+
+ private static boolean isDfsClientClose(DFSClient c) {
+ try {
+ c.exists("");
+ } catch (IOException e) {
+ return e.getMessage().equals("Filesystem closed");
+ }
+ return false;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java Thu Oct 17 05:32:42 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -51,8 +52,9 @@ public class TestOffsetRange {
OffsetRange r3 = new OffsetRange(1, 3);
OffsetRange r4 = new OffsetRange(3, 4);
- assertTrue(r2.compareTo(r3) == 0);
- assertTrue(r2.compareTo(r1) == 1);
- assertTrue(r2.compareTo(r4) == -1);
+ assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
+ assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
+ assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
+ assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
}
}