You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [4/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Tue Aug 19 23:49:39 2014
@@ -20,14 +20,15 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -40,10 +41,11 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.nfs.AccessPrivilege;
import org.apache.hadoop.nfs.NfsExports;
@@ -123,6 +125,7 @@ import org.apache.hadoop.oncrpc.security
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -130,9 +133,6 @@ import org.jboss.netty.channel.ChannelHa
import com.google.common.annotations.VisibleForTesting;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY;
-
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
*/
@@ -140,43 +140,43 @@ public class RpcProgramNfs3 extends RpcP
public static final int DEFAULT_UMASK = 0022;
public static final FsPermission umask = new FsPermission(
(short) DEFAULT_UMASK);
-
+
static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
- private final Configuration config = new Configuration();
+ private final NfsConfiguration config;
private final WriteManager writeManager;
private final IdUserGroup iug;
private final DFSClientCache clientCache;
private final NfsExports exports;
-
- /**
- * superUserClient should always impersonate HDFS file system owner to send
- * requests which requires supergroup privilege. This requires the same user
- * to start HDFS and NFS.
- */
- private final DFSClient superUserClient;
-
+
private final short replication;
private final long blockSize;
private final int bufferSize;
+ private final boolean aixCompatMode;
private Statistics statistics;
private String writeDumpDir; // The dir save dump files
-
+
private final RpcCallCache rpcCallCache;
- public RpcProgramNfs3(Configuration config) throws IOException {
- super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT,
- Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
- Nfs3Constant.VERSION, Nfs3Constant.VERSION);
-
+ public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
+ boolean allowInsecurePorts) throws IOException {
+ super("NFS3", "localhost", config.getInt(
+ NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+ NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
+ Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
+ allowInsecurePorts);
+
+ this.config = config;
config.set(FsPermission.UMASK_LABEL, "000");
- iug = new IdUserGroup();
-
+ iug = new IdUserGroup(config);
+
+ aixCompatMode = config.getBoolean(
+ NfsConfigKeys.AIX_COMPAT_MODE_KEY,
+ NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
exports = NfsExports.getInstance(config);
- writeManager = new WriteManager(iug, config);
+ writeManager = new WriteManager(iug, config, aixCompatMode);
clientCache = new DFSClientCache(config);
- superUserClient = new DFSClient(NameNode.getAddress(config), config);
replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
@@ -184,14 +184,14 @@ public class RpcProgramNfs3 extends RpcP
bufferSize = config.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-
- writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
- Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
- boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY,
- Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT);
+
+ writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
+ NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
+ boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
+ NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT);
UserGroupInformation.setConfiguration(config);
- SecurityUtil.login(config, DFS_NFS_KEYTAB_FILE_KEY,
- DFS_NFS_KERBEROS_PRINCIPAL_KEY);
+ SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
+ NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
if (!enableDump) {
writeDumpDir = null;
@@ -216,12 +216,23 @@ public class RpcProgramNfs3 extends RpcP
throw new IOException("Cannot create dump directory " + dumpDir);
}
}
-
+
@Override
public void startDaemons() {
writeManager.startAsyncDataSerivce();
}
-
+
+ // Checks the type of IOException and maps it to appropriate Nfs3Status code.
+ private int mapErrorStatus(IOException e) {
+ if (e instanceof FileNotFoundException) {
+ return Nfs3Status.NFS3ERR_STALE;
+ } else if (e instanceof AccessControlException) {
+ return Nfs3Status.NFS3ERR_ACCES;
+ } else {
+ return Nfs3Status.NFS3ERR_IO;
+ }
+ }
+
/******************************************************
* RPC call handlers
******************************************************/
@@ -235,21 +246,26 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
+ return getattr(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
GETATTR3Request request = null;
try {
request = new GETATTR3Request(xdr);
@@ -267,9 +283,21 @@ public class RpcProgramNfs3 extends RpcP
Nfs3FileAttributes attrs = null;
try {
attrs = writeManager.getFileAttr(dfsClient, handle, iug);
+ } catch (RemoteException r) {
+ LOG.warn("Exception ", r);
+ IOException io = r.unwrapRemoteException();
+ /**
+ * AuthorizationException can be thrown if the user can't be proxy'ed.
+ */
+ if (io instanceof AuthorizationException) {
+ return new GETATTR3Response(Nfs3Status.NFS3ERR_ACCES);
+ } else {
+ return new GETATTR3Response(Nfs3Status.NFS3ERR_IO);
+ }
} catch (IOException e) {
LOG.info("Can't get file attribute, fileId=" + handle.getFileId(), e);
- response.setStatus(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ response.setStatus(status);
return response;
}
if (attrs == null) {
@@ -286,7 +314,7 @@ public class RpcProgramNfs3 extends RpcP
private void setattrInternal(DFSClient dfsClient, String fileIdPath,
SetAttr3 newAttr, boolean setMode) throws IOException {
EnumSet<SetAttrField> updateFields = newAttr.getUpdateFields();
-
+
if (setMode && updateFields.contains(SetAttrField.MODE)) {
if (LOG.isDebugEnabled()) {
LOG.debug("set new mode:" + newAttr.getMode());
@@ -316,15 +344,20 @@ public class RpcProgramNfs3 extends RpcP
}
@Override
- public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
+ return setattr(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
SETATTR3Request request = null;
try {
request = new SETATTR3Request(xdr);
@@ -362,9 +395,9 @@ public class RpcProgramNfs3 extends RpcP
return new SETATTR3Response(Nfs3Status.NFS3ERR_NOT_SYNC, wccData);
}
}
-
+
// check the write access privilege
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
preOpWcc, preOpAttr));
}
@@ -383,30 +416,33 @@ public class RpcProgramNfs3 extends RpcP
} catch (IOException e1) {
LOG.info("Can't get postOpAttr for fileIdPath: " + fileIdPath, e1);
}
- if (e instanceof AccessControlException) {
- return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, wccData);
- } else {
- return new SETATTR3Response(Nfs3Status.NFS3ERR_IO, wccData);
- }
+
+ int status = mapErrorStatus(e);
+ return new SETATTR3Response(status, wccData);
}
}
@Override
- public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
+ return lookup(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
LOOKUP3Request request = null;
try {
request = new LOOKUP3Request(xdr);
@@ -449,26 +485,32 @@ public class RpcProgramNfs3 extends RpcP
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new LOOKUP3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new LOOKUP3Response(status);
}
}
-
+
@Override
- public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public ACCESS3Response access(XDR xdr, RpcInfo info) {
+ return access(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
ACCESS3Request request = null;
try {
request = new ACCESS3Request(xdr);
@@ -482,7 +524,7 @@ public class RpcProgramNfs3 extends RpcP
if (LOG.isDebugEnabled()) {
LOG.debug("NFS ACCESS fileId: " + handle.getFileId());
- }
+ }
try {
// HDFS-5804 removed supserUserClient access
@@ -493,20 +535,39 @@ public class RpcProgramNfs3 extends RpcP
return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE);
}
int access = Nfs3Utils.getAccessRightsForUserGroup(
- securityHandler.getUid(), securityHandler.getGid(), attrs);
-
+ securityHandler.getUid(), securityHandler.getGid(),
+ securityHandler.getAuxGids(), attrs);
+
return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
+ } catch (RemoteException r) {
+ LOG.warn("Exception ", r);
+ IOException io = r.unwrapRemoteException();
+ /**
+ * AuthorizationException can be thrown if the user can't be proxy'ed.
+ */
+ if (io instanceof AuthorizationException) {
+ return new ACCESS3Response(Nfs3Status.NFS3ERR_ACCES);
+ } else {
+ return new ACCESS3Response(Nfs3Status.NFS3ERR_IO);
+ }
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new ACCESS3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new ACCESS3Response(status);
}
}
- public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ @Override
+ public READLINK3Response readlink(XDR xdr, RpcInfo info) {
+ return readlink(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
@@ -550,8 +611,8 @@ public class RpcProgramNfs3 extends RpcP
+ handle.getFileId());
return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
}
- int rtmax = config.getInt(Nfs3Constant.MAX_READ_TRANSFER_SIZE_KEY,
- Nfs3Constant.MAX_READ_TRANSFER_SIZE_DEFAULT);
+ int rtmax = config.getInt(NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY,
+ NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_DEFAULT);
if (rtmax < target.getBytes().length) {
LOG.error("Link size: " + target.getBytes().length
+ " is larger than max transfer size: " + rtmax);
@@ -564,32 +625,33 @@ public class RpcProgramNfs3 extends RpcP
} catch (IOException e) {
LOG.warn("Readlink error: " + e.getClass(), e);
- if (e instanceof FileNotFoundException) {
- return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
- } else if (e instanceof AccessControlException) {
- return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
- }
- return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new READLINK3Response(status);
}
}
@Override
- public READ3Response read(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public READ3Response read(XDR xdr, RpcInfo info) {
+ return read(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ READ3Response read(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
final String userName = securityHandler.getUser();
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(userName);
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
READ3Request request = null;
try {
@@ -629,7 +691,8 @@ public class RpcProgramNfs3 extends RpcP
return new READ3Response(Nfs3Status.NFS3ERR_NOENT);
}
int access = Nfs3Utils.getAccessRightsForUserGroup(
- securityHandler.getUid(), securityHandler.getGid(), attrs);
+ securityHandler.getUid(), securityHandler.getGid(),
+ securityHandler.getAuxGids(), attrs);
if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
eof = offset < attrs.getSize() ? false : true;
return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
@@ -638,7 +701,7 @@ public class RpcProgramNfs3 extends RpcP
return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
}
}
-
+
// In case there is buffered data for the same file, flush it. This can be
// optimized later by reading from the cache.
int ret = writeManager.commitBeforeRead(dfsClient, handle, offset + count);
@@ -648,8 +711,8 @@ public class RpcProgramNfs3 extends RpcP
}
try {
- int rtmax = config.getInt(Nfs3Constant.MAX_READ_TRANSFER_SIZE_KEY,
- Nfs3Constant.MAX_READ_TRANSFER_SIZE_DEFAULT);
+ int rtmax = config.getInt(NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY,
+ NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_DEFAULT);
int buffSize = Math.min(rtmax, count);
byte[] readbuffer = new byte[buffSize];
@@ -661,6 +724,10 @@ public class RpcProgramNfs3 extends RpcP
FSDataInputStream fis = clientCache.getDfsInputStream(userName,
Nfs3Utils.getFileIdPath(handle));
+ if (fis == null) {
+ return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
+ }
+
try {
readCount = fis.read(offset, readbuffer, 0, count);
} catch (IOException e) {
@@ -693,13 +760,23 @@ public class RpcProgramNfs3 extends RpcP
} catch (IOException e) {
LOG.warn("Read error: " + e.getClass() + " offset: " + offset
+ " count: " + count, e);
- return new READ3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new READ3Response(status);
}
}
@Override
- public WRITE3Response write(XDR xdr, Channel channel, int xid,
- SecurityHandler securityHandler, InetAddress client) {
+ public WRITE3Response write(XDR xdr, RpcInfo info) {
+ SecurityHandler securityHandler = getSecurityHandler(info);
+ RpcCall rpcCall = (RpcCall) info.header();
+ int xid = rpcCall.getXid();
+ SocketAddress remoteAddress = info.remoteAddress();
+ return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
+ }
+
+ @VisibleForTesting
+ WRITE3Response write(XDR xdr, Channel channel, int xid,
+ SecurityHandler securityHandler, SocketAddress remoteAddress) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
@@ -707,7 +784,7 @@ public class RpcProgramNfs3 extends RpcP
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
WRITE3Request request = null;
try {
@@ -740,13 +817,13 @@ public class RpcProgramNfs3 extends RpcP
LOG.error("Can't get path for fileId:" + handle.getFileId());
return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
}
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current filesize="
+ preOpAttr.getSize());
@@ -766,23 +843,30 @@ public class RpcProgramNfs3 extends RpcP
}
WccAttr attr = preOpAttr == null ? null : Nfs3Utils.getWccAttr(preOpAttr);
WccData fileWcc = new WccData(attr, postOpAttr);
- return new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0,
- request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+
+ int status = mapErrorStatus(e);
+ return new WRITE3Response(status, fileWcc, 0, request.getStableHow(),
+ Nfs3Constant.WRITE_COMMIT_VERF);
}
return null;
}
@Override
- public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public CREATE3Response create(XDR xdr, RpcInfo info) {
+ return create(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
CREATE3Request request = null;
try {
@@ -820,8 +904,8 @@ public class RpcProgramNfs3 extends RpcP
LOG.error("Can't get path for dirHandle:" + dirHandle);
return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
}
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr));
@@ -833,15 +917,15 @@ public class RpcProgramNfs3 extends RpcP
FsPermission permission = setAttr3.getUpdateFields().contains(
SetAttrField.MODE) ? new FsPermission((short) setAttr3.getMode())
: FsPermission.getDefault().applyUMask(umask);
-
+
EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ?
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) :
EnumSet.of(CreateFlag.CREATE);
-
+
fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
flag, false, replication, blockSize, null, bufferSize, null),
statistics);
-
+
if ((createMode == Nfs3Constant.CREATE_UNCHECKED)
|| (createMode == Nfs3Constant.CREATE_GUARDED)) {
// Set group if it's not specified in the request.
@@ -855,10 +939,11 @@ public class RpcProgramNfs3 extends RpcP
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
dfsClient, dirFileIdPath, iug);
-
+
// Add open stream
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
- writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
+ writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
+ aixCompatMode);
fileHandle = new FileHandle(postOpObjAttr.getFileId());
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
LOG.warn("Can't add more stream, close it."
@@ -871,7 +956,7 @@ public class RpcProgramNfs3 extends RpcP
+ fileHandle.getFileId());
}
}
-
+
} catch (IOException e) {
LOG.error("Exception", e);
if (fos != null) {
@@ -891,29 +976,30 @@ public class RpcProgramNfs3 extends RpcP
+ dirHandle.getFileId(), e1);
}
}
- if (e instanceof AccessControlException) {
- return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, fileHandle,
- postOpObjAttr, dirWcc);
- } else {
- return new CREATE3Response(Nfs3Status.NFS3ERR_IO, fileHandle,
- postOpObjAttr, dirWcc);
- }
+
+ int status = mapErrorStatus(e);
+ return new CREATE3Response(status, fileHandle, postOpObjAttr, dirWcc);
}
-
+
return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
dirWcc);
}
@Override
- public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
+ return mkdir(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
MKDIR3Request request = null;
try {
@@ -943,11 +1029,11 @@ public class RpcProgramNfs3 extends RpcP
return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
}
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
}
-
+
final String fileIdPath = dirFileIdPath + "/" + fileName;
SetAttr3 setAttr3 = request.getObjAttr();
FsPermission permission = setAttr3.getUpdateFields().contains(
@@ -966,7 +1052,7 @@ public class RpcProgramNfs3 extends RpcP
setAttr3.setGid(securityHandler.getGid());
}
setattrInternal(dfsClient, fileIdPath, setAttr3, false);
-
+
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
objFileHandle = new FileHandle(postOpObjAttr.getFileId());
WccData dirWcc = Nfs3Utils.createWccData(
@@ -983,33 +1069,34 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e);
}
}
+
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpDirAttr);
- if (e instanceof AccessControlException) {
- return new MKDIR3Response(Nfs3Status.NFS3ERR_PERM, objFileHandle,
- postOpObjAttr, dirWcc);
- } else {
- return new MKDIR3Response(Nfs3Status.NFS3ERR_IO, objFileHandle,
- postOpObjAttr, dirWcc);
- }
+ int status = mapErrorStatus(e);
+ return new MKDIR3Response(status, objFileHandle, postOpObjAttr, dirWcc);
}
}
- public READDIR3Response mknod(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ @Override
+ public READDIR3Response mknod(XDR xdr, RpcInfo info) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
-
+
@Override
- public REMOVE3Response remove(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
+ public REMOVE3Response remove(XDR xdr, RpcInfo info) {
+ return remove(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
REMOVE3Request request = null;
try {
request = new REMOVE3Request(xdr);
@@ -1034,17 +1121,19 @@ public class RpcProgramNfs3 extends RpcP
return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE);
}
+ WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+ preOpDirAttr);
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
+ return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, errWcc);
+ }
+
String fileIdPath = dirFileIdPath + "/" + fileName;
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
if (fstat == null) {
- WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
- preOpDirAttr);
- return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
+ return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
}
if (fstat.isDir()) {
- WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
- preOpDirAttr);
- return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc);
+ return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, errWcc);
}
boolean result = dfsClient.delete(fileIdPath, false);
@@ -1065,26 +1154,29 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
}
}
+
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpDirAttr);
- if (e instanceof AccessControlException) {
- return new REMOVE3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
- } else {
- return new REMOVE3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
- }
+ int status = mapErrorStatus(e);
+ return new REMOVE3Response(status, dirWcc);
}
}
@Override
- public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
+ return rmdir(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
RMDIR3Request request = null;
try {
request = new RMDIR3Request(xdr);
@@ -1109,10 +1201,10 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
return new RMDIR3Response(Nfs3Status.NFS3ERR_STALE);
}
-
+
WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc);
}
@@ -1124,7 +1216,7 @@ public class RpcProgramNfs3 extends RpcP
if (!fstat.isDir()) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, errWcc);
}
-
+
if (fstat.getChildrenNum() > 0) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, errWcc);
}
@@ -1147,26 +1239,29 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
}
}
+
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpDirAttr);
- if (e instanceof AccessControlException) {
- return new RMDIR3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
- } else {
- return new RMDIR3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
- }
+ int status = mapErrorStatus(e);
+ return new RMDIR3Response(status, dirWcc);
}
}
@Override
- public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public RENAME3Response rename(XDR xdr, RpcInfo info) {
+ return rename(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
RENAME3Request request = null;
try {
request = new RENAME3Request(xdr);
@@ -1203,8 +1298,8 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get path for toHandle fileId:" + toHandle.getFileId());
return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
}
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
fromPreOpAttr);
WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1225,7 +1320,7 @@ public class RpcProgramNfs3 extends RpcP
return new RENAME3Response(Nfs3Status.NFS3_OK, fromDirWcc, toDirWcc);
} catch (IOException e) {
LOG.warn("Exception ", e);
- // Try to return correct WccData
+ // Try to return correct WccData
try {
fromDirWcc = Nfs3Utils.createWccData(
Nfs3Utils.getWccAttr(fromPreOpAttr), dfsClient, fromDirFileIdPath,
@@ -1236,21 +1331,23 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get postOpDirAttr for " + fromDirFileIdPath + " or"
+ toDirFileIdPath, e1);
}
- if (e instanceof AccessControlException) {
- return new RENAME3Response(Nfs3Status.NFS3ERR_PERM, fromDirWcc,
- toDirWcc);
- } else {
- return new RENAME3Response(Nfs3Status.NFS3ERR_IO, fromDirWcc, toDirWcc);
- }
+
+ int status = mapErrorStatus(e);
+ return new RENAME3Response(status, fromDirWcc, toDirWcc);
}
}
@Override
- public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
+ return symlink(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
@@ -1300,13 +1397,14 @@ public class RpcProgramNfs3 extends RpcP
} catch (IOException e) {
LOG.warn("Exception:" + e);
- response.setStatus(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ response.setStatus(status);
return response;
}
}
- public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ @Override
+ public READDIR3Response link(XDR xdr, RpcInfo info) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
@@ -1332,23 +1430,27 @@ public class RpcProgramNfs3 extends RpcP
}
return dlisting;
}
-
+
@Override
+ public READDIR3Response readdir(XDR xdr, RpcInfo info) {
+ return readdir(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ SocketAddress remoteAddress) {
READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
READDIR3Request request = null;
try {
request = new READDIR3Request(xdr);
@@ -1367,7 +1469,7 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Nonpositive count in invalid READDIR request:" + count);
return new READDIR3Response(Nfs3Status.NFS3_OK);
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: "
+ cookie + " count: " + count);
@@ -1391,9 +1493,24 @@ public class RpcProgramNfs3 extends RpcP
}
long cookieVerf = request.getCookieVerf();
if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
- LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
- + " dir cookieVerf:" + dirStatus.getModificationTime());
- return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+ if (aixCompatMode) {
+ // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+ // the same cookieverf value even across VFS-level readdir calls,
+ // instead of getting a new cookieverf for every VFS-level readdir
+ // call, and reusing the cookieverf only in the event that multiple
+ // incremental NFS-level readdir calls must be made to fetch all of
+ // the directory entries. This means that whenever a readdir call is
+ // made by an AIX NFS client for a given directory, and that directory
+ // is subsequently modified, thus changing its mtime, no later readdir
+ // calls will succeed from AIX for that directory until the FS is
+ // unmounted/remounted. See HDFS-6549 for more info.
+ LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+ "mismatches.");
+ } else {
+ LOG.error("CookieVerf mismatch. request cookieVerf: " + cookieVerf
+ + " dir cookieVerf: " + dirStatus.getModificationTime());
+ return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+ }
}
if (cookie == 0) {
@@ -1417,7 +1534,7 @@ public class RpcProgramNfs3 extends RpcP
String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
startAfter = inodeIdPath.getBytes();
}
-
+
dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (postOpAttr == null) {
@@ -1426,21 +1543,22 @@ public class RpcProgramNfs3 extends RpcP
}
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new READDIR3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new READDIR3Response(status);
}
/**
* Set up the dirents in the response. fileId is used as the cookie with one
* exception. Linux client can either be stuck with "ls" command (on REHL)
* or report "Too many levels of symbolic links" (Ubuntu).
- *
+ *
* The problem is that, only two items returned, "." and ".." when the
* namespace is empty. Both of them are "/" with the same cookie(root
* fileId). Linux client doesn't think such a directory is a real directory.
* Even though NFS protocol specifies cookie is an opaque data, Linux client
* somehow doesn't like an empty dir returns same cookie for both "." and
* "..".
- *
+ *
* The workaround is to use 0 as the cookie for "." and always return "." as
* the first entry in readdir/readdirplus response.
*/
@@ -1448,7 +1566,7 @@ public class RpcProgramNfs3 extends RpcP
int n = (int) Math.min(fstatus.length, count-2);
boolean eof = (n < fstatus.length) ? false : (dlisting
.getRemainingEntries() == 0);
-
+
Entry3[] entries;
if (cookie == 0) {
entries = new Entry3[n + 2];
@@ -1468,23 +1586,29 @@ public class RpcProgramNfs3 extends RpcP
fstatus[i].getLocalName(), fstatus[i].getFileId());
}
}
-
+
DirList3 dirList = new READDIR3Response.DirList3(entries, eof);
return new READDIR3Response(Nfs3Status.NFS3_OK, postOpAttr,
dirStatus.getModificationTime(), dirList);
}
- public READDIRPLUS3Response readdirplus(XDR xdr,
- SecurityHandler securityHandler, InetAddress client) {
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+ @Override
+ public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
+ return readdirplus(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
}
-
+
READDIRPLUS3Request request = null;
try {
request = new READDIRPLUS3Request(xdr);
@@ -1509,7 +1633,7 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
+ cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1533,9 +1657,22 @@ public class RpcProgramNfs3 extends RpcP
}
long cookieVerf = request.getCookieVerf();
if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
- LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
- + " dir cookieVerf:" + dirStatus.getModificationTime());
- return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+ if (aixCompatMode) {
+ // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+ // the same cookieverf value even across VFS-level readdir calls,
+ // instead of getting a new cookieverf for every VFS-level readdir
+ // call. This means that whenever a readdir call is made by an AIX NFS
+ // client for a given directory, and that directory is subsequently
+ // modified, thus changing its mtime, no later readdir calls will
+ // succeed for that directory from AIX until the FS is
+ // unmounted/remounted. See HDFS-6549 for more info.
+ LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+ "mismatches.");
+ } else {
+ LOG.error("cookieverf mismatch. request cookieverf: " + cookieVerf
+ + " dir cookieverf: " + dirStatus.getModificationTime());
+ return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+ }
}
if (cookie == 0) {
@@ -1559,7 +1696,7 @@ public class RpcProgramNfs3 extends RpcP
String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
startAfter = inodeIdPath.getBytes();
}
-
+
dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (postOpDirAttr == null) {
@@ -1568,19 +1705,20 @@ public class RpcProgramNfs3 extends RpcP
}
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new READDIRPLUS3Response(status);
}
-
+
// Set up the dirents in the response
HdfsFileStatus[] fstatus = dlisting.getPartialListing();
int n = (int) Math.min(fstatus.length, dirCount-2);
boolean eof = (n < fstatus.length) ? false : (dlisting
.getRemainingEntries() == 0);
-
+
READDIRPLUS3Response.EntryPlus3[] entries;
if (cookie == 0) {
entries = new READDIRPLUS3Response.EntryPlus3[n+2];
-
+
entries[0] = new READDIRPLUS3Response.EntryPlus3(
postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle(
postOpDirAttr.getFileId()));
@@ -1624,23 +1762,28 @@ public class RpcProgramNfs3 extends RpcP
return new READDIRPLUS3Response(Nfs3Status.NFS3_OK, postOpDirAttr,
dirStatus.getModificationTime(), dirListPlus);
}
-
+
@Override
- public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
+ return fsstat(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
FSSTAT3Request request = null;
try {
request = new FSSTAT3Request(xdr);
@@ -1655,18 +1798,17 @@ public class RpcProgramNfs3 extends RpcP
}
try {
- // Use superUserClient to get file system status
- FsStatus fsStatus = superUserClient.getDiskStatus();
+ FsStatus fsStatus = dfsClient.getDiskStatus();
long totalBytes = fsStatus.getCapacity();
long freeBytes = fsStatus.getRemaining();
-
+
Nfs3FileAttributes attrs = writeManager.getFileAttr(dfsClient, handle,
iug);
if (attrs == null) {
LOG.info("Can't get path for fileId:" + handle.getFileId());
return new FSSTAT3Response(Nfs3Status.NFS3ERR_STALE);
}
-
+
long maxFsObjects = config.getLong("dfs.max.objects", 0);
if (maxFsObjects == 0) {
// A value of zero in HDFS indicates no limit to the number
@@ -1674,31 +1816,48 @@ public class RpcProgramNfs3 extends RpcP
// Long.MAX_VALUE so 32bit client won't complain.
maxFsObjects = Integer.MAX_VALUE;
}
-
+
return new FSSTAT3Response(Nfs3Status.NFS3_OK, attrs, totalBytes,
freeBytes, freeBytes, maxFsObjects, maxFsObjects, maxFsObjects, 0);
+ } catch (RemoteException r) {
+ LOG.warn("Exception ", r);
+ IOException io = r.unwrapRemoteException();
+ /**
+ * AuthorizationException can be thrown if the user can't be proxy'ed.
+ */
+ if (io instanceof AuthorizationException) {
+ return new FSSTAT3Response(Nfs3Status.NFS3ERR_ACCES);
+ } else {
+ return new FSSTAT3Response(Nfs3Status.NFS3ERR_IO);
+ }
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new FSSTAT3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new FSSTAT3Response(status);
}
}
@Override
- public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
+ return fsinfo(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
FSINFO3Request request = null;
try {
request = new FSINFO3Request(xdr);
@@ -1713,12 +1872,15 @@ public class RpcProgramNfs3 extends RpcP
}
try {
- int rtmax = config.getInt(Nfs3Constant.MAX_READ_TRANSFER_SIZE_KEY,
- Nfs3Constant.MAX_READ_TRANSFER_SIZE_DEFAULT);
- int wtmax = config.getInt(Nfs3Constant.MAX_WRITE_TRANSFER_SIZE_KEY,
- Nfs3Constant.MAX_WRITE_TRANSFER_SIZE_DEFAULT);
- int dtperf = config.getInt(Nfs3Constant.MAX_READDIR_TRANSFER_SIZE_KEY,
- Nfs3Constant.MAX_READDIR_TRANSFER_SIZE_DEFAULT);
+ int rtmax = config.getInt(
+ NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY,
+ NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_DEFAULT);
+ int wtmax = config.getInt(
+ NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_KEY,
+ NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_DEFAULT);
+ int dtperf = config.getInt(
+ NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_KEY,
+ NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_DEFAULT);
Nfs3FileAttributes attrs = Nfs3Utils.getFileAttr(dfsClient,
Nfs3Utils.getFileIdPath(handle), iug);
@@ -1726,7 +1888,7 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get path for fileId:" + handle.getFileId());
return new FSINFO3Response(Nfs3Status.NFS3ERR_STALE);
}
-
+
int fsProperty = Nfs3Constant.FSF3_CANSETTIME
| Nfs3Constant.FSF3_HOMOGENEOUS;
@@ -1734,26 +1896,32 @@ public class RpcProgramNfs3 extends RpcP
wtmax, wtmax, 1, dtperf, Long.MAX_VALUE, new NfsTime(1), fsProperty);
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new FSINFO3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new FSINFO3Response(status);
}
}
@Override
- public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
- InetAddress client) {
+ public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
+ return pathconf(xdr, getSecurityHandler(info), info.remoteAddress());
+ }
+
+ @VisibleForTesting
+ PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+ SocketAddress remoteAddress) {
PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
-
+
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
PATHCONF3Request request = null;
try {
request = new PATHCONF3Request(xdr);
@@ -1781,20 +1949,30 @@ public class RpcProgramNfs3 extends RpcP
HdfsConstants.MAX_PATH_LENGTH, true, false, false, true);
} catch (IOException e) {
LOG.warn("Exception ", e);
- return new PATHCONF3Response(Nfs3Status.NFS3ERR_IO);
+ int status = mapErrorStatus(e);
+ return new PATHCONF3Response(status);
}
}
@Override
- public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
- SecurityHandler securityHandler, InetAddress client) {
+ public COMMIT3Response commit(XDR xdr, RpcInfo info) {
+ SecurityHandler securityHandler = getSecurityHandler(info);
+ RpcCall rpcCall = (RpcCall) info.header();
+ int xid = rpcCall.getXid();
+ SocketAddress remoteAddress = info.remoteAddress();
+ return commit(xdr, info.channel(), xid, securityHandler, remoteAddress);
+ }
+
+ @VisibleForTesting
+ COMMIT3Response commit(XDR xdr, Channel channel, int xid,
+ SecurityHandler securityHandler, SocketAddress remoteAddress) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
-
+
COMMIT3Request request = null;
try {
request = new COMMIT3Request(xdr);
@@ -1818,19 +1996,19 @@ public class RpcProgramNfs3 extends RpcP
LOG.info("Can't get path for fileId:" + handle.getFileId());
return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
}
-
- if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+ if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
Nfs3Constant.WRITE_COMMIT_VERF);
}
-
+
long commitOffset = (request.getCount() == 0) ? 0
: (request.getOffset() + request.getCount());
-
+
// Insert commit as an async request
- writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
- preOpAttr);
+ writeManager.handleCommit(dfsClient, handle, commitOffset,
+ channel, xid, preOpAttr);
return null;
} catch (IOException e) {
LOG.warn("Exception ", e);
@@ -1840,9 +2018,11 @@ public class RpcProgramNfs3 extends RpcP
} catch (IOException e1) {
LOG.info("Can't get postOpAttr for fileId: " + handle.getFileId(), e1);
}
+
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
- return new COMMIT3Response(Nfs3Status.NFS3ERR_IO, fileWcc,
- Nfs3Constant.WRITE_COMMIT_VERF);
+ int status = mapErrorStatus(e);
+ return new COMMIT3Response(status, fileWcc,
+ Nfs3Constant.WRITE_COMMIT_VERF);
}
}
@@ -1855,11 +2035,16 @@ public class RpcProgramNfs3 extends RpcP
return null;
}
}
-
+
+ private SecurityHandler getSecurityHandler(RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
+ return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
+ }
+
@Override
public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
RpcCall rpcCall = (RpcCall) info.header();
- final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
+ final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
byte[] data = new byte[info.data().readableBytes()];
info.data().readBytes(data);
@@ -1867,9 +2052,8 @@ public class RpcProgramNfs3 extends RpcP
XDR out = new XDR();
InetAddress client = ((InetSocketAddress) info.remoteAddress())
.getAddress();
- Channel channel = info.channel();
-
Credentials credentials = rpcCall.getCredential();
+
// Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
if (nfsproc3 != NFSPROC3.NULL) {
if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -1906,28 +2090,25 @@ public class RpcProgramNfs3 extends RpcP
}
}
}
-
- SecurityHandler securityHandler = getSecurityHandler(credentials,
- rpcCall.getVerifier());
-
+
NFS3Response response = null;
if (nfsproc3 == NFSPROC3.NULL) {
response = nullProcedure();
} else if (nfsproc3 == NFSPROC3.GETATTR) {
- response = getattr(xdr, securityHandler, client);
+ response = getattr(xdr, info);
} else if (nfsproc3 == NFSPROC3.SETATTR) {
- response = setattr(xdr, securityHandler, client);
+ response = setattr(xdr, info);
} else if (nfsproc3 == NFSPROC3.LOOKUP) {
- response = lookup(xdr, securityHandler, client);
+ response = lookup(xdr, info);
} else if (nfsproc3 == NFSPROC3.ACCESS) {
- response = access(xdr, securityHandler, client);
+ response = access(xdr, info);
} else if (nfsproc3 == NFSPROC3.READLINK) {
- response = readlink(xdr, securityHandler, client);
+ response = readlink(xdr, info);
} else if (nfsproc3 == NFSPROC3.READ) {
if (LOG.isDebugEnabled()) {
LOG.debug(Nfs3Utils.READ_RPC_START + xid);
- }
- response = read(xdr, securityHandler, client);
+ }
+ response = read(xdr, info);
if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
LOG.debug(Nfs3Utils.READ_RPC_END + xid);
}
@@ -1935,36 +2116,36 @@ public class RpcProgramNfs3 extends RpcP
if (LOG.isDebugEnabled()) {
LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
}
- response = write(xdr, channel, xid, securityHandler, client);
+ response = write(xdr, info);
// Write end debug trace is in Nfs3Utils.writeChannel
} else if (nfsproc3 == NFSPROC3.CREATE) {
- response = create(xdr, securityHandler, client);
- } else if (nfsproc3 == NFSPROC3.MKDIR) {
- response = mkdir(xdr, securityHandler, client);
+ response = create(xdr, info);
+ } else if (nfsproc3 == NFSPROC3.MKDIR) {
+ response = mkdir(xdr, info);
} else if (nfsproc3 == NFSPROC3.SYMLINK) {
- response = symlink(xdr, securityHandler, client);
+ response = symlink(xdr, info);
} else if (nfsproc3 == NFSPROC3.MKNOD) {
- response = mknod(xdr, securityHandler, client);
+ response = mknod(xdr, info);
} else if (nfsproc3 == NFSPROC3.REMOVE) {
- response = remove(xdr, securityHandler, client);
+ response = remove(xdr, info);
} else if (nfsproc3 == NFSPROC3.RMDIR) {
- response = rmdir(xdr, securityHandler, client);
+ response = rmdir(xdr, info);
} else if (nfsproc3 == NFSPROC3.RENAME) {
- response = rename(xdr, securityHandler, client);
+ response = rename(xdr, info);
} else if (nfsproc3 == NFSPROC3.LINK) {
- response = link(xdr, securityHandler, client);
+ response = link(xdr, info);
} else if (nfsproc3 == NFSPROC3.READDIR) {
- response = readdir(xdr, securityHandler, client);
+ response = readdir(xdr, info);
} else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
- response = readdirplus(xdr, securityHandler, client);
+ response = readdirplus(xdr, info);
} else if (nfsproc3 == NFSPROC3.FSSTAT) {
- response = fsstat(xdr, securityHandler, client);
+ response = fsstat(xdr, info);
} else if (nfsproc3 == NFSPROC3.FSINFO) {
- response = fsinfo(xdr, securityHandler, client);
+ response = fsinfo(xdr, info);
} else if (nfsproc3 == NFSPROC3.PATHCONF) {
- response = pathconf(xdr, securityHandler, client);
+ response = pathconf(xdr,info);
} else if (nfsproc3 == NFSPROC3.COMMIT) {
- response = commit(xdr, channel, xid, securityHandler, client);
+ response = commit(xdr, info);
} else {
// Invalid procedure
RpcAcceptedReply.getInstance(xid,
@@ -1990,15 +2171,25 @@ public class RpcProgramNfs3 extends RpcP
RpcUtil.sendRpcResponse(ctx, rsp);
}
-
+
@Override
protected boolean isIdempotent(RpcCall call) {
- final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure());
+ final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure());
return nfsproc3 == null || nfsproc3.isIdempotent();
}
-
- private boolean checkAccessPrivilege(final InetAddress client,
+
+ private boolean checkAccessPrivilege(SocketAddress remoteAddress,
final AccessPrivilege expected) {
+ // Port monitoring
+ if (!doPortMonitoring(remoteAddress)) {
+ return false;
+ }
+
+ // Check export table
+ if (exports == null) {
+ return false;
+ }
+ InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
AccessPrivilege access = exports.getAccessPrivilege(client);
if (access == AccessPrivilege.NONE) {
return false;
@@ -2009,7 +2200,7 @@ public class RpcProgramNfs3 extends RpcP
}
return true;
}
-
+
@VisibleForTesting
WriteManager getWriteManager() {
return this.writeManager;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Tue Aug 19 23:49:39 2014
@@ -21,10 +21,11 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
@@ -50,13 +51,14 @@ import com.google.common.annotations.Vis
public class WriteManager {
public static final Log LOG = LogFactory.getLog(WriteManager.class);
- private final Configuration config;
+ private final NfsConfiguration config;
private final IdUserGroup iug;
private AsyncDataService asyncDataService;
private boolean asyncDataServiceStarted = false;
private final int maxStreams;
+ private final boolean aixCompatMode;
/**
* The time limit to wait for accumulate reordered sequential writes to the
@@ -78,19 +80,21 @@ public class WriteManager {
return fileContextCache.put(h, ctx);
}
- WriteManager(IdUserGroup iug, final Configuration config) {
+ WriteManager(IdUserGroup iug, final NfsConfiguration config,
+ boolean aixCompatMode) {
this.iug = iug;
this.config = config;
- streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT,
- Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
+ this.aixCompatMode = aixCompatMode;
+ streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY,
+ NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
LOG.info("Stream timeout is " + streamTimeout + "ms.");
- if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
+ if (streamTimeout < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) {
LOG.info("Reset stream timeout to minimum value "
- + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
- streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT;
+ + NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
+ streamTimeout = NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT;
}
- maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
- Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
+ maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY,
+ NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT);
LOG.info("Maximum open streams is "+ maxStreams);
this.fileContextCache = new OpenFileCtxCache(config, streamTimeout);
}
@@ -171,10 +175,10 @@ public class WriteManager {
}
// Add open stream
- String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
- Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
+ String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
+ NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
- + fileHandle.getFileId(), dfsClient, iug);
+ + fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
if (!addOpenFileStream(fileHandle, openFileCtx)) {
LOG.info("Can't add new stream. Close it. Tell client to retry.");
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java Tue Aug 19 23:49:39 2014
@@ -23,8 +23,8 @@ import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
@@ -38,7 +38,7 @@ public class TestMountd {
@Test
public void testStart() throws IOException {
// Start minicluster
- Configuration config = new Configuration();
+ NfsConfiguration config = new NfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
.build();
cluster.waitActive();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,8 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
@@ -155,10 +156,10 @@ public class TestOutOfOrderWrite {
Arrays.fill(data3, (byte) 9);
// NFS3 Create request
- Configuration conf = new Configuration();
+ NfsConfiguration conf = new NfsConfiguration();
WriteClient client = new WriteClient("localhost", conf.getInt(
- Nfs3Constant.NFS3_SERVER_PORT, Nfs3Constant.NFS3_SERVER_PORT_DEFAULT),
- create(), false);
+ NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+ NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), create(), false);
client.run();
while (handle == null) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Tue Aug 19 23:49:39 2014
@@ -18,24 +18,24 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestDFSClientCache {
@Test
public void testEviction() throws IOException {
- Configuration conf = new Configuration();
+ NfsConfiguration conf = new NfsConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
// Only one entry will be in the cache
@@ -59,7 +59,7 @@ public class TestDFSClientCache {
String currentUser = "test-user";
- Configuration conf = new Configuration();
+ NfsConfiguration conf = new NfsConfiguration();
UserGroupInformation currentUserUgi
= UserGroupInformation.createRemoteUser(currentUser);
currentUserUgi.setAuthenticationMethod(KERBEROS);
@@ -83,7 +83,7 @@ public class TestDFSClientCache {
UserGroupInformation currentUserUgi = UserGroupInformation
.createUserForTesting(currentUser, new String[0]);
- Configuration conf = new Configuration();
+ NfsConfiguration conf = new NfsConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
DFSClientCache cache = new DFSClientCache(conf);
UserGroupInformation ugiResult
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java Tue Aug 19 23:49:39 2014
@@ -21,22 +21,22 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
-import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.junit.Test;
public class TestExportsTable {
@Test
public void testExportPoint() throws IOException {
- Configuration config = new Configuration();
+ NfsConfiguration config = new NfsConfiguration();
MiniDFSCluster cluster = null;
String exportPoint = "/myexport1";
- config.setStrings(Nfs3Constant.EXPORT_POINT, exportPoint);
+ config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint);
// Use emphral port in case tests are running in parallel
config.setInt("nfs3.mountd.port", 0);
config.setInt("nfs3.server.port", 0);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java Tue Aug 19 23:49:39 2014
@@ -22,13 +22,13 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
-import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.junit.Test;
import org.mockito.Mockito;
@@ -38,10 +38,10 @@ public class TestOpenFileCtxCache {
@Test
public void testEviction() throws IOException, InterruptedException {
- Configuration conf = new Configuration();
+ NfsConfiguration conf = new NfsConfiguration();
// Only two entries will be in the cache
- conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
+ conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
@@ -49,15 +49,15 @@ public class TestOpenFileCtxCache {
Mockito.when(fos.getPos()).thenReturn((long) 0);
OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
@@ -71,7 +71,7 @@ public class TestOpenFileCtxCache {
assertTrue(cache.size() == 2);
// Wait for the oldest stream to be evict-able, insert again
- Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+ Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
assertTrue(cache.size() == 2);
ret = cache.put(new FileHandle(3), context3);
@@ -90,17 +90,17 @@ public class TestOpenFileCtxCache {
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
context4.getPendingCommitsForTest().put(new Long(100),
new CommitCtx(0, null, 0, attr));
- Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+ Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
ret = cache.put(new FileHandle(5), context5);
assertFalse(ret);
}
@Test
public void testScan() throws IOException, InterruptedException {
- Configuration conf = new Configuration();
+ NfsConfiguration conf = new NfsConfiguration();
// Only two entries will be in the cache
- conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
+ conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
@@ -108,13 +108,13 @@ public class TestOpenFileCtxCache {
Mockito.when(fos.getPos()).thenReturn((long) 0);
OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
- dfsClient, new IdUserGroup());
+ dfsClient, new IdUserGroup(new NfsConfiguration()));
OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
@@ -123,8 +123,8 @@ public class TestOpenFileCtxCache {
assertTrue(ret);
ret = cache.put(new FileHandle(2), context2);
assertTrue(ret);
- Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1);
- cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+ Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1);
+ cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
assertTrue(cache.size() == 0);
// Test cleaning inactive entry
@@ -133,7 +133,7 @@ public class TestOpenFileCtxCache {
ret = cache.put(new FileHandle(4), context4);
assertTrue(ret);
context3.setActiveStatusForTest(false);
- cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
+ cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
assertTrue(cache.size() == 1);
assertTrue(cache.get(new FileHandle(3)) == null);
assertTrue(cache.get(new FileHandle(4)) != null);