You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [4/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Tue Aug 19 23:49:39 2014
@@ -20,14 +20,15 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -40,10 +41,11 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.nfs.AccessPrivilege;
 import org.apache.hadoop.nfs.NfsExports;
@@ -123,6 +125,7 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -130,9 +133,6 @@ import org.jboss.netty.channel.ChannelHa
 
 import com.google.common.annotations.VisibleForTesting;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY;
-
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
  */
@@ -140,43 +140,43 @@ public class RpcProgramNfs3 extends RpcP
   public static final int DEFAULT_UMASK = 0022;
   public static final FsPermission umask = new FsPermission(
       (short) DEFAULT_UMASK);
-  
+
   static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
 
-  private final Configuration config = new Configuration();
+  private final NfsConfiguration config;
   private final WriteManager writeManager;
   private final IdUserGroup iug;
   private final DFSClientCache clientCache;
 
   private final NfsExports exports;
-  
-  /**
-   * superUserClient should always impersonate HDFS file system owner to send
-   * requests which requires supergroup privilege. This requires the same user
-   * to start HDFS and NFS.
-   */
-  private final DFSClient superUserClient;
-  
+
   private final short replication;
   private final long blockSize;
   private final int bufferSize;
+  private final boolean aixCompatMode;
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
-  
+
   private final RpcCallCache rpcCallCache;
 
-  public RpcProgramNfs3(Configuration config) throws IOException {
-    super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT,
-        Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
-        Nfs3Constant.VERSION, Nfs3Constant.VERSION);
-   
+  public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
+      boolean allowInsecurePorts) throws IOException {
+    super("NFS3", "localhost", config.getInt(
+        NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+        NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
+        Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
+        allowInsecurePorts);
+
+    this.config = config;
     config.set(FsPermission.UMASK_LABEL, "000");
-    iug = new IdUserGroup();
-    
+    iug = new IdUserGroup(config);
+
+    aixCompatMode = config.getBoolean(
+        NfsConfigKeys.AIX_COMPAT_MODE_KEY,
+        NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
     exports = NfsExports.getInstance(config);
-    writeManager = new WriteManager(iug, config);
+    writeManager = new WriteManager(iug, config, aixCompatMode);
     clientCache = new DFSClientCache(config);
-    superUserClient = new DFSClient(NameNode.getAddress(config), config);
     replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
         DFSConfigKeys.DFS_REPLICATION_DEFAULT);
     blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
@@ -184,14 +184,14 @@ public class RpcProgramNfs3 extends RpcP
     bufferSize = config.getInt(
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-    
-    writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
-        Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
-    boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY,
-        Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT);
+
+    writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
+        NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
+    boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
+        NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT);
     UserGroupInformation.setConfiguration(config);
-    SecurityUtil.login(config, DFS_NFS_KEYTAB_FILE_KEY,
-            DFS_NFS_KERBEROS_PRINCIPAL_KEY);
+    SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
+        NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
 
     if (!enableDump) {
       writeDumpDir = null;
@@ -216,12 +216,23 @@ public class RpcProgramNfs3 extends RpcP
       throw new IOException("Cannot create dump directory " + dumpDir);
     }
   }
-  
+
   @Override
   public void startDaemons() {
      writeManager.startAsyncDataSerivce();
   }
-  
+
+  // Checks the type of IOException and maps it to appropriate Nfs3Status code.
+  private int mapErrorStatus(IOException e) {
+    if (e instanceof FileNotFoundException) {
+      return Nfs3Status.NFS3ERR_STALE;
+    } else if (e instanceof AccessControlException) {
+      return Nfs3Status.NFS3ERR_ACCES;
+    } else {
+      return Nfs3Status.NFS3ERR_IO;
+    }
+  }
+
   /******************************************************
    * RPC call handlers
    ******************************************************/
@@ -235,21 +246,26 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
+    return getattr(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     GETATTR3Request request = null;
     try {
       request = new GETATTR3Request(xdr);
@@ -267,9 +283,21 @@ public class RpcProgramNfs3 extends RpcP
     Nfs3FileAttributes attrs = null;
     try {
       attrs = writeManager.getFileAttr(dfsClient, handle, iug);
+    } catch (RemoteException r) {
+      LOG.warn("Exception ", r);
+      IOException io = r.unwrapRemoteException();
+      /**
+       * AuthorizationException can be thrown if the user can't be proxy'ed.
+       */
+      if (io instanceof AuthorizationException) {
+        return new GETATTR3Response(Nfs3Status.NFS3ERR_ACCES);
+      } else {
+        return new GETATTR3Response(Nfs3Status.NFS3ERR_IO);
+      }
     } catch (IOException e) {
       LOG.info("Can't get file attribute, fileId=" + handle.getFileId(), e);
-      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      response.setStatus(status);
       return response;
     }
     if (attrs == null) {
@@ -286,7 +314,7 @@ public class RpcProgramNfs3 extends RpcP
   private void setattrInternal(DFSClient dfsClient, String fileIdPath,
       SetAttr3 newAttr, boolean setMode) throws IOException {
     EnumSet<SetAttrField> updateFields = newAttr.getUpdateFields();
-    
+
     if (setMode && updateFields.contains(SetAttrField.MODE)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("set new mode:" + newAttr.getMode());
@@ -316,15 +344,20 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
+    return setattr(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     SETATTR3Request request = null;
     try {
       request = new SETATTR3Request(xdr);
@@ -362,9 +395,9 @@ public class RpcProgramNfs3 extends RpcP
           return new SETATTR3Response(Nfs3Status.NFS3ERR_NOT_SYNC, wccData);
         }
       }
-      
+
       // check the write access privilege
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             preOpWcc, preOpAttr));
       }
@@ -383,30 +416,33 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileIdPath: " + fileIdPath, e1);
       }
-      if (e instanceof AccessControlException) {
-        return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, wccData);
-      } else {
-        return new SETATTR3Response(Nfs3Status.NFS3ERR_IO, wccData);
-      }
+
+      int status = mapErrorStatus(e);
+      return new SETATTR3Response(status, wccData);
     }
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
+    return lookup(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     LOOKUP3Request request = null;
     try {
       request = new LOOKUP3Request(xdr);
@@ -449,26 +485,32 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new LOOKUP3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new LOOKUP3Response(status);
     }
   }
-  
+
   @Override
-  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public ACCESS3Response access(XDR xdr, RpcInfo info) {
+    return access(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     ACCESS3Request request = null;
     try {
       request = new ACCESS3Request(xdr);
@@ -482,7 +524,7 @@ public class RpcProgramNfs3 extends RpcP
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS ACCESS fileId: " + handle.getFileId());
-    } 
+    }
 
     try {
       // HDFS-5804 removed supserUserClient access
@@ -493,20 +535,39 @@ public class RpcProgramNfs3 extends RpcP
         return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE);
       }
       int access = Nfs3Utils.getAccessRightsForUserGroup(
-          securityHandler.getUid(), securityHandler.getGid(), attrs);
-      
+          securityHandler.getUid(), securityHandler.getGid(),
+          securityHandler.getAuxGids(), attrs);
+
       return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
+    } catch (RemoteException r) {
+      LOG.warn("Exception ", r);
+      IOException io = r.unwrapRemoteException();
+      /**
+       * AuthorizationException can be thrown if the user can't be proxy'ed.
+       */
+      if (io instanceof AuthorizationException) {
+        return new ACCESS3Response(Nfs3Status.NFS3ERR_ACCES);
+      } else {
+        return new ACCESS3Response(Nfs3Status.NFS3ERR_IO);
+      }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new ACCESS3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new ACCESS3Response(status);
     }
   }
 
-  public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READLINK3Response readlink(XDR xdr, RpcInfo info) {
+    return readlink(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
@@ -550,8 +611,8 @@ public class RpcProgramNfs3 extends RpcP
             + handle.getFileId());
         return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
       }
-      int rtmax = config.getInt(Nfs3Constant.MAX_READ_TRANSFER_SIZE_KEY,
-              Nfs3Constant.MAX_READ_TRANSFER_SIZE_DEFAULT);
+      int rtmax = config.getInt(NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY,
+          NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_DEFAULT);
       if (rtmax < target.getBytes().length) {
         LOG.error("Link size: " + target.getBytes().length
             + " is larger than max transfer size: " + rtmax);
@@ -564,32 +625,33 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Readlink error: " + e.getClass(), e);
-      if (e instanceof FileNotFoundException) {
-        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
-      } else if (e instanceof AccessControlException) {
-        return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
-      }
-      return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READLINK3Response(status);
     }
   }
 
   @Override
-  public READ3Response read(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public READ3Response read(XDR xdr, RpcInfo info) {
+    return read(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  READ3Response read(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     final String userName = securityHandler.getUser();
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(userName);
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     READ3Request request = null;
 
     try {
@@ -629,7 +691,8 @@ public class RpcProgramNfs3 extends RpcP
         return new READ3Response(Nfs3Status.NFS3ERR_NOENT);
       }
       int access = Nfs3Utils.getAccessRightsForUserGroup(
-          securityHandler.getUid(), securityHandler.getGid(), attrs);
+          securityHandler.getUid(), securityHandler.getGid(),
+          securityHandler.getAuxGids(), attrs);
       if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
         eof = offset < attrs.getSize() ? false : true;
         return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
@@ -638,7 +701,7 @@ public class RpcProgramNfs3 extends RpcP
         return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
       }
     }
-    
+
     // In case there is buffered data for the same file, flush it. This can be
     // optimized later by reading from the cache.
     int ret = writeManager.commitBeforeRead(dfsClient, handle, offset + count);
@@ -648,8 +711,8 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     try {
-      int rtmax = config.getInt(Nfs3Constant.MAX_READ_TRANSFER_SIZE_KEY,
-              Nfs3Constant.MAX_READ_TRANSFER_SIZE_DEFAULT);
+      int rtmax = config.getInt(NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY,
+          NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_DEFAULT);
       int buffSize = Math.min(rtmax, count);
       byte[] readbuffer = new byte[buffSize];
 
@@ -661,6 +724,10 @@ public class RpcProgramNfs3 extends RpcP
         FSDataInputStream fis = clientCache.getDfsInputStream(userName,
             Nfs3Utils.getFileIdPath(handle));
 
+        if (fis == null) {
+            return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
+        }
+
         try {
           readCount = fis.read(offset, readbuffer, 0, count);
         } catch (IOException e) {
@@ -693,13 +760,23 @@ public class RpcProgramNfs3 extends RpcP
     } catch (IOException e) {
       LOG.warn("Read error: " + e.getClass() + " offset: " + offset
           + " count: " + count, e);
-      return new READ3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READ3Response(status);
     }
   }
 
   @Override
-  public WRITE3Response write(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public WRITE3Response write(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  WRITE3Response write(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
 
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
@@ -707,7 +784,7 @@ public class RpcProgramNfs3 extends RpcP
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     WRITE3Request request = null;
 
     try {
@@ -740,13 +817,13 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for fileId:" + handle.getFileId());
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Constant.WRITE_COMMIT_VERF);
       }
-      
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("requesed offset=" + offset + " and current filesize="
             + preOpAttr.getSize());
@@ -766,23 +843,30 @@ public class RpcProgramNfs3 extends RpcP
       }
       WccAttr attr = preOpAttr == null ? null : Nfs3Utils.getWccAttr(preOpAttr);
       WccData fileWcc = new WccData(attr, postOpAttr);
-      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0,
-          request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+
+      int status = mapErrorStatus(e);
+      return new WRITE3Response(status, fileWcc, 0, request.getStableHow(),
+          Nfs3Constant.WRITE_COMMIT_VERF);
     }
 
     return null;
   }
 
   @Override
-  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public CREATE3Response create(XDR xdr, RpcInfo info) {
+    return create(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     CREATE3Request request = null;
 
     try {
@@ -820,8 +904,8 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for dirHandle:" + dirHandle);
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
                 preOpDirAttr));
@@ -833,15 +917,15 @@ public class RpcProgramNfs3 extends RpcP
       FsPermission permission = setAttr3.getUpdateFields().contains(
           SetAttrField.MODE) ? new FsPermission((short) setAttr3.getMode())
           : FsPermission.getDefault().applyUMask(umask);
-          
+
       EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ? 
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : 
           EnumSet.of(CreateFlag.CREATE);
-      
+
       fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
           flag, false, replication, blockSize, null, bufferSize, null),
           statistics);
-      
+
       if ((createMode == Nfs3Constant.CREATE_UNCHECKED)
           || (createMode == Nfs3Constant.CREATE_GUARDED)) {
         // Set group if it's not specified in the request.
@@ -855,10 +939,11 @@ public class RpcProgramNfs3 extends RpcP
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           dfsClient, dirFileIdPath, iug);
-      
+
       // Add open stream
       OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
-          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
+          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
+          aixCompatMode);
       fileHandle = new FileHandle(postOpObjAttr.getFileId());
       if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.warn("Can't add more stream, close it."
@@ -871,7 +956,7 @@ public class RpcProgramNfs3 extends RpcP
               + fileHandle.getFileId());
         }
       }
-      
+
     } catch (IOException e) {
       LOG.error("Exception", e);
       if (fos != null) {
@@ -891,29 +976,30 @@ public class RpcProgramNfs3 extends RpcP
               + dirHandle.getFileId(), e1);
         }
       }
-      if (e instanceof AccessControlException) {
-        return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, fileHandle,
-            postOpObjAttr, dirWcc);
-      } else {
-        return new CREATE3Response(Nfs3Status.NFS3ERR_IO, fileHandle,
-            postOpObjAttr, dirWcc);
-      }
+
+      int status = mapErrorStatus(e);
+      return new CREATE3Response(status, fileHandle, postOpObjAttr, dirWcc);
     }
-    
+
     return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
         dirWcc);
   }
 
   @Override
-  public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
+    return mkdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     MKDIR3Request request = null;
 
     try {
@@ -943,11 +1029,11 @@ public class RpcProgramNfs3 extends RpcP
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
       }
-      
+
       final String fileIdPath = dirFileIdPath + "/" + fileName;
       SetAttr3 setAttr3 = request.getObjAttr();
       FsPermission permission = setAttr3.getUpdateFields().contains(
@@ -966,7 +1052,7 @@ public class RpcProgramNfs3 extends RpcP
         setAttr3.setGid(securityHandler.getGid());
       }
       setattrInternal(dfsClient, fileIdPath, setAttr3, false);
-      
+
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       objFileHandle = new FileHandle(postOpObjAttr.getFileId());
       WccData dirWcc = Nfs3Utils.createWccData(
@@ -983,33 +1069,34 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new MKDIR3Response(Nfs3Status.NFS3ERR_PERM, objFileHandle,
-            postOpObjAttr, dirWcc);
-      } else {
-        return new MKDIR3Response(Nfs3Status.NFS3ERR_IO, objFileHandle,
-            postOpObjAttr, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new MKDIR3Response(status, objFileHandle, postOpObjAttr, dirWcc);
     }
   }
 
-  public READDIR3Response mknod(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  @Override
+  public READDIR3Response mknod(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
-  
+
   @Override
-  public REMOVE3Response remove(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public REMOVE3Response remove(XDR xdr, RpcInfo info) {
+    return remove(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     REMOVE3Request request = null;
     try {
       request = new REMOVE3Request(xdr);
@@ -1034,17 +1121,19 @@ public class RpcProgramNfs3 extends RpcP
         return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
+      WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+          preOpDirAttr);
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
+        return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, errWcc);
+      }
+
       String fileIdPath = dirFileIdPath + "/" + fileName;
       HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
-        WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
-            preOpDirAttr);
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
+        return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
       }
       if (fstat.isDir()) {
-        WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
-            preOpDirAttr);
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc);
+        return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, errWcc);
       }
 
       boolean result = dfsClient.delete(fileIdPath, false);
@@ -1065,26 +1154,29 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
-      } else {
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new REMOVE3Response(status, dirWcc);
     }
   }
 
   @Override
-  public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
+    return rmdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     RMDIR3Request request = null;
     try {
       request = new RMDIR3Request(xdr);
@@ -1109,10 +1201,10 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
         return new RMDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           preOpDirAttr);
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
       }
 
@@ -1124,7 +1216,7 @@ public class RpcProgramNfs3 extends RpcP
       if (!fstat.isDir()) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, errWcc);
       }
-      
+
       if (fstat.getChildrenNum() > 0) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, errWcc);
       }
@@ -1147,26 +1239,29 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
-      } else {
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new RMDIR3Response(status, dirWcc);
     }
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RENAME3Response rename(XDR xdr, RpcInfo info) {
+    return rename(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     RENAME3Request request = null;
     try {
       request = new RENAME3Request(xdr);
@@ -1203,8 +1298,8 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for toHandle fileId:" + toHandle.getFileId());
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
             fromPreOpAttr);
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1225,7 +1320,7 @@ public class RpcProgramNfs3 extends RpcP
       return new RENAME3Response(Nfs3Status.NFS3_OK, fromDirWcc, toDirWcc);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      // Try to return correct WccData      
+      // Try to return correct WccData
       try {
         fromDirWcc = Nfs3Utils.createWccData(
             Nfs3Utils.getWccAttr(fromPreOpAttr), dfsClient, fromDirFileIdPath,
@@ -1236,21 +1331,23 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get postOpDirAttr for " + fromDirFileIdPath + " or"
             + toDirFileIdPath, e1);
       }
-      if (e instanceof AccessControlException) {
-        return new RENAME3Response(Nfs3Status.NFS3ERR_PERM, fromDirWcc,
-            toDirWcc);
-      } else {
-        return new RENAME3Response(Nfs3Status.NFS3ERR_IO, fromDirWcc, toDirWcc);
-      }
+
+      int status = mapErrorStatus(e);
+      return new RENAME3Response(status, fromDirWcc, toDirWcc);
     }
   }
 
   @Override
-  public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
+    return symlink(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
@@ -1300,13 +1397,14 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Exception:" + e);
-      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      response.setStatus(status);
       return response;
     }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READDIR3Response link(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
@@ -1332,23 +1430,27 @@ public class RpcProgramNfs3 extends RpcP
     }
     return dlisting;
   }
-  
+
   @Override
+  public READDIR3Response readdir(XDR xdr, RpcInfo info) {
+    return readdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+      SocketAddress remoteAddress) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     READDIR3Request request = null;
     try {
       request = new READDIR3Request(xdr);
@@ -1367,7 +1469,7 @@ public class RpcProgramNfs3 extends RpcP
       LOG.info("Nonpositive count in invalid READDIR request:" + count);
       return new READDIR3Response(Nfs3Status.NFS3_OK);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: "
           + cookie + " count: " + count);
@@ -1391,9 +1493,24 @@ public class RpcProgramNfs3 extends RpcP
       }
       long cookieVerf = request.getCookieVerf();
       if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
-        LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
-            + " dir cookieVerf:" + dirStatus.getModificationTime());
-        return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        if (aixCompatMode) {
+          // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+          // the same cookieverf value even across VFS-level readdir calls,
+          // instead of getting a new cookieverf for every VFS-level readdir
+          // call, and reusing the cookieverf only in the event that multiple
+          // incremental NFS-level readdir calls must be made to fetch all of
+          // the directory entries. This means that whenever a readdir call is
+          // made by an AIX NFS client for a given directory, and that directory
+          // is subsequently modified, thus changing its mtime, no later readdir
+          // calls will succeed from AIX for that directory until the FS is
+          // unmounted/remounted. See HDFS-6549 for more info.
+          LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+              "mismatches.");
+        } else {
+          LOG.error("CookieVerf mismatch. request cookieVerf: " + cookieVerf
+              + " dir cookieVerf: " + dirStatus.getModificationTime());
+          return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        }
       }
 
       if (cookie == 0) {
@@ -1417,7 +1534,7 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      
+
       dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpAttr == null) {
@@ -1426,21 +1543,22 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new READDIR3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READDIR3Response(status);
     }
 
     /**
      * Set up the dirents in the response. fileId is used as the cookie with one
      * exception. Linux client can either be stuck with "ls" command (on REHL)
      * or report "Too many levels of symbolic links" (Ubuntu).
-     * 
+     *
      * The problem is that, only two items returned, "." and ".." when the
      * namespace is empty. Both of them are "/" with the same cookie(root
      * fileId). Linux client doesn't think such a directory is a real directory.
      * Even though NFS protocol specifies cookie is an opaque data, Linux client
      * somehow doesn't like an empty dir returns same cookie for both "." and
      * "..".
-     * 
+     *
      * The workaround is to use 0 as the cookie for "." and always return "." as
      * the first entry in readdir/readdirplus response.
      */
@@ -1448,7 +1566,7 @@ public class RpcProgramNfs3 extends RpcP
     int n = (int) Math.min(fstatus.length, count-2);
     boolean eof = (n < fstatus.length) ? false : (dlisting
         .getRemainingEntries() == 0);
-    
+
     Entry3[] entries;
     if (cookie == 0) {
       entries = new Entry3[n + 2];
@@ -1468,23 +1586,29 @@ public class RpcProgramNfs3 extends RpcP
             fstatus[i].getLocalName(), fstatus[i].getFileId());
       }
     }
-    
+
     DirList3 dirList = new READDIR3Response.DirList3(entries, eof);
     return new READDIR3Response(Nfs3Status.NFS3_OK, postOpAttr,
         dirStatus.getModificationTime(), dirList);
   }
 
-  public READDIRPLUS3Response readdirplus(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+  @Override
+  public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
+    return readdirplus(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
     }
-    
+
     READDIRPLUS3Request request = null;
     try {
       request = new READDIRPLUS3Request(xdr);
@@ -1509,7 +1633,7 @@ public class RpcProgramNfs3 extends RpcP
       LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
           + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1533,9 +1657,22 @@ public class RpcProgramNfs3 extends RpcP
       }
       long cookieVerf = request.getCookieVerf();
       if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
-        LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
-            + " dir cookieVerf:" + dirStatus.getModificationTime());
-        return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        if (aixCompatMode) {
+          // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+          // the same cookieverf value even across VFS-level readdir calls,
+          // instead of getting a new cookieverf for every VFS-level readdir
+          // call. This means that whenever a readdir call is made by an AIX NFS
+          // client for a given directory, and that directory is subsequently
+          // modified, thus changing its mtime, no later readdir calls will
+          // succeed for that directory from AIX until the FS is
+          // unmounted/remounted. See HDFS-6549 for more info.
+          LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+              "mismatches.");
+        } else {
+          LOG.error("cookieverf mismatch. request cookieverf: " + cookieVerf
+              + " dir cookieverf: " + dirStatus.getModificationTime());
+          return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        }
       }
 
       if (cookie == 0) {
@@ -1559,7 +1696,7 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      
+
       dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpDirAttr == null) {
@@ -1568,19 +1705,20 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READDIRPLUS3Response(status);
     }
-    
+
     // Set up the dirents in the response
     HdfsFileStatus[] fstatus = dlisting.getPartialListing();
     int n = (int) Math.min(fstatus.length, dirCount-2);
     boolean eof = (n < fstatus.length) ? false : (dlisting
         .getRemainingEntries() == 0);
-    
+
     READDIRPLUS3Response.EntryPlus3[] entries;
     if (cookie == 0) {
       entries = new READDIRPLUS3Response.EntryPlus3[n+2];
-      
+
       entries[0] = new READDIRPLUS3Response.EntryPlus3(
           postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle(
               postOpDirAttr.getFileId()));
@@ -1624,23 +1762,28 @@ public class RpcProgramNfs3 extends RpcP
     return new READDIRPLUS3Response(Nfs3Status.NFS3_OK, postOpDirAttr,
         dirStatus.getModificationTime(), dirListPlus);
   }
-  
+
   @Override
-  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
+    return fsstat(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     FSSTAT3Request request = null;
     try {
       request = new FSSTAT3Request(xdr);
@@ -1655,18 +1798,17 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     try {
-      // Use superUserClient to get file system status
-      FsStatus fsStatus = superUserClient.getDiskStatus();
+      FsStatus fsStatus = dfsClient.getDiskStatus();
       long totalBytes = fsStatus.getCapacity();
       long freeBytes = fsStatus.getRemaining();
-      
+
       Nfs3FileAttributes attrs = writeManager.getFileAttr(dfsClient, handle,
           iug);
       if (attrs == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new FSSTAT3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       long maxFsObjects = config.getLong("dfs.max.objects", 0);
       if (maxFsObjects == 0) {
         // A value of zero in HDFS indicates no limit to the number
@@ -1674,31 +1816,48 @@ public class RpcProgramNfs3 extends RpcP
         // Long.MAX_VALUE so 32bit client won't complain.
         maxFsObjects = Integer.MAX_VALUE;
       }
-      
+
       return new FSSTAT3Response(Nfs3Status.NFS3_OK, attrs, totalBytes,
           freeBytes, freeBytes, maxFsObjects, maxFsObjects, maxFsObjects, 0);
+    } catch (RemoteException r) {
+      LOG.warn("Exception ", r);
+      IOException io = r.unwrapRemoteException();
+      /**
+       * AuthorizationException can be thrown if the user can't be proxy'ed.
+       */
+      if (io instanceof AuthorizationException) {
+        return new FSSTAT3Response(Nfs3Status.NFS3ERR_ACCES);
+      } else {
+        return new FSSTAT3Response(Nfs3Status.NFS3ERR_IO);
+      }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new FSSTAT3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new FSSTAT3Response(status);
     }
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
+    return fsinfo(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     FSINFO3Request request = null;
     try {
       request = new FSINFO3Request(xdr);
@@ -1713,12 +1872,15 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     try {
-      int rtmax = config.getInt(Nfs3Constant.MAX_READ_TRANSFER_SIZE_KEY,
-              Nfs3Constant.MAX_READ_TRANSFER_SIZE_DEFAULT);
-      int wtmax = config.getInt(Nfs3Constant.MAX_WRITE_TRANSFER_SIZE_KEY,
-              Nfs3Constant.MAX_WRITE_TRANSFER_SIZE_DEFAULT);
-      int dtperf = config.getInt(Nfs3Constant.MAX_READDIR_TRANSFER_SIZE_KEY,
-              Nfs3Constant.MAX_READDIR_TRANSFER_SIZE_DEFAULT);
+      int rtmax = config.getInt(
+          NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY,
+          NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_DEFAULT);
+      int wtmax = config.getInt(
+          NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_KEY,
+          NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_DEFAULT);
+      int dtperf = config.getInt(
+          NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_KEY,
+          NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_DEFAULT);
 
       Nfs3FileAttributes attrs = Nfs3Utils.getFileAttr(dfsClient,
           Nfs3Utils.getFileIdPath(handle), iug);
@@ -1726,7 +1888,7 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new FSINFO3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       int fsProperty = Nfs3Constant.FSF3_CANSETTIME
           | Nfs3Constant.FSF3_HOMOGENEOUS;
 
@@ -1734,26 +1896,32 @@ public class RpcProgramNfs3 extends RpcP
           wtmax, wtmax, 1, dtperf, Long.MAX_VALUE, new NfsTime(1), fsProperty);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new FSINFO3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new FSINFO3Response(status);
     }
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
+    return pathconf(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     PATHCONF3Request request = null;
     try {
       request = new PATHCONF3Request(xdr);
@@ -1781,20 +1949,30 @@ public class RpcProgramNfs3 extends RpcP
           HdfsConstants.MAX_PATH_LENGTH, true, false, false, true);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new PATHCONF3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new PATHCONF3Response(status);
     }
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return commit(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  COMMIT3Response commit(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     COMMIT3Request request = null;
     try {
       request = new COMMIT3Request(xdr);
@@ -1818,19 +1996,19 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Constant.WRITE_COMMIT_VERF);
       }
-      
+
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
-      
+
       // Insert commit as an async request
-      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
-          preOpAttr);
+      writeManager.handleCommit(dfsClient, handle, commitOffset,
+          channel, xid, preOpAttr);
       return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
@@ -1840,9 +2018,11 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileId: " + handle.getFileId(), e1);
       }
+
       WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
-      return new COMMIT3Response(Nfs3Status.NFS3ERR_IO, fileWcc,
-          Nfs3Constant.WRITE_COMMIT_VERF);
+      int status = mapErrorStatus(e);
+      return new COMMIT3Response(status, fileWcc,
+        Nfs3Constant.WRITE_COMMIT_VERF);
     }
   }
 
@@ -1855,11 +2035,16 @@ public class RpcProgramNfs3 extends RpcP
       return null;
     }
   }
-  
+
+  private SecurityHandler getSecurityHandler(RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
+    return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
+  }
+
   @Override
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     RpcCall rpcCall = (RpcCall) info.header();
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());    
     int xid = rpcCall.getXid();
     byte[] data = new byte[info.data().readableBytes()];
     info.data().readBytes(data);
@@ -1867,9 +2052,8 @@ public class RpcProgramNfs3 extends RpcP
     XDR out = new XDR();
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
         .getAddress();
-    Channel channel = info.channel();
-
     Credentials credentials = rpcCall.getCredential();
+
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -1906,28 +2090,25 @@ public class RpcProgramNfs3 extends RpcP
         }
       }
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(credentials,
-        rpcCall.getVerifier());
-    
+
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
-      response = getattr(xdr, securityHandler, client);
+      response = getattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
-      response = setattr(xdr, securityHandler, client);
+      response = setattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
-      response = lookup(xdr, securityHandler, client);
+      response = lookup(xdr, info);
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
-      response = access(xdr, securityHandler, client);
+      response = access(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READLINK) {
-      response = readlink(xdr, securityHandler, client);
+      response = readlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READ) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
-      }    
-      response = read(xdr, securityHandler, client);
+      }
+      response = read(xdr, info);
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
       }
@@ -1935,36 +2116,36 @@ public class RpcProgramNfs3 extends RpcP
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
       }
-      response = write(xdr, channel, xid, securityHandler, client);
+      response = write(xdr, info);
       // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
-      response = create(xdr, securityHandler, client);
-    } else if (nfsproc3 == NFSPROC3.MKDIR) {      
-      response = mkdir(xdr, securityHandler, client);
+      response = create(xdr, info);
+    } else if (nfsproc3 == NFSPROC3.MKDIR) {
+      response = mkdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
-      response = symlink(xdr, securityHandler, client);
+      response = symlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
-      response = mknod(xdr, securityHandler, client);
+      response = mknod(xdr, info);
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
-      response = remove(xdr, securityHandler, client);
+      response = remove(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
-      response = rmdir(xdr, securityHandler, client);
+      response = rmdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RENAME) {
-      response = rename(xdr, securityHandler, client);
+      response = rename(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LINK) {
-      response = link(xdr, securityHandler, client);
+      response = link(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIR) {
-      response = readdir(xdr, securityHandler, client);
+      response = readdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
-      response = readdirplus(xdr, securityHandler, client);
+      response = readdirplus(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
-      response = fsstat(xdr, securityHandler, client);
+      response = fsstat(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
-      response = fsinfo(xdr, securityHandler, client);
+      response = fsinfo(xdr, info);
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
-      response = pathconf(xdr, securityHandler, client);
+      response = pathconf(xdr,info);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, channel, xid, securityHandler, client);
+      response = commit(xdr, info);
     } else {
       // Invalid procedure
       RpcAcceptedReply.getInstance(xid,
@@ -1990,15 +2171,25 @@ public class RpcProgramNfs3 extends RpcP
 
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
-  
+
   @Override
   protected boolean isIdempotent(RpcCall call) {
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure()); 
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure());
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
-  
-  private boolean checkAccessPrivilege(final InetAddress client,
+
+  private boolean checkAccessPrivilege(SocketAddress remoteAddress,
       final AccessPrivilege expected) {
+    // Port monitoring
+    if (!doPortMonitoring(remoteAddress)) {
+      return false;
+    }
+
+    // Check export table
+    if (exports == null) {
+        return false;
+    }
+    InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
     AccessPrivilege access = exports.getAccessPrivilege(client);
     if (access == AccessPrivilege.NONE) {
       return false;
@@ -2009,7 +2200,7 @@ public class RpcProgramNfs3 extends RpcP
     }
     return true;
   }
-  
+
   @VisibleForTesting
   WriteManager getWriteManager() {
     return this.writeManager;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Tue Aug 19 23:49:39 2014
@@ -21,10 +21,11 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
@@ -50,13 +51,14 @@ import com.google.common.annotations.Vis
 public class WriteManager {
   public static final Log LOG = LogFactory.getLog(WriteManager.class);
 
-  private final Configuration config;
+  private final NfsConfiguration config;
   private final IdUserGroup iug;
  
   private AsyncDataService asyncDataService;
   private boolean asyncDataServiceStarted = false;
 
   private final int maxStreams;
+  private final boolean aixCompatMode;
 
   /**
    * The time limit to wait for accumulate reordered sequential writes to the
@@ -78,19 +80,21 @@ public class WriteManager {
     return fileContextCache.put(h, ctx);
   }
   
-  WriteManager(IdUserGroup iug, final Configuration config) {
+  WriteManager(IdUserGroup iug, final NfsConfiguration config,
+      boolean aixCompatMode) {
     this.iug = iug;
     this.config = config;
-    streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT,
-        Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
+    this.aixCompatMode = aixCompatMode;
+    streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY,
+        NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
     LOG.info("Stream timeout is " + streamTimeout + "ms.");
-    if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
+    if (streamTimeout < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) {
       LOG.info("Reset stream timeout to minimum value "
-          + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
-      streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT;
+          + NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
+      streamTimeout = NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT;
     }
-    maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
-        Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
+    maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY,
+        NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT);
     LOG.info("Maximum open streams is "+ maxStreams);
     this.fileContextCache = new OpenFileCtxCache(config, streamTimeout);
   }
@@ -171,10 +175,10 @@ public class WriteManager {
       }
 
       // Add open stream
-      String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
-          Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
+      String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
+          NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId(), dfsClient, iug);
+          + fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
 
       if (!addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.info("Can't add new stream. Close it. Tell client to retry.");

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java Tue Aug 19 23:49:39 2014
@@ -23,8 +23,8 @@ import java.net.InetAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
 import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
 import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
@@ -38,7 +38,7 @@ public class TestMountd {
   @Test
   public void testStart() throws IOException {
     // Start minicluster
-    Configuration config = new Configuration();
+    NfsConfiguration config = new NfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
         .build();
     cluster.waitActive();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,8 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
@@ -155,10 +156,10 @@ public class TestOutOfOrderWrite {
     Arrays.fill(data3, (byte) 9);
 
     // NFS3 Create request
-    Configuration conf = new Configuration();
+    NfsConfiguration conf = new NfsConfiguration();
     WriteClient client = new WriteClient("localhost", conf.getInt(
-        Nfs3Constant.NFS3_SERVER_PORT, Nfs3Constant.NFS3_SERVER_PORT_DEFAULT),
-        create(), false);
+        NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+        NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), create(), false);
     client.run();
 
     while (handle == null) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Tue Aug 19 23:49:39 2014
@@ -18,24 +18,24 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
 
 public class TestDFSClientCache {
   @Test
   public void testEviction() throws IOException {
-    Configuration conf = new Configuration();
+    NfsConfiguration conf = new NfsConfiguration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
 
     // Only one entry will be in the cache
@@ -59,7 +59,7 @@ public class TestDFSClientCache {
     String currentUser = "test-user";
 
 
-    Configuration conf = new Configuration();
+    NfsConfiguration conf = new NfsConfiguration();
     UserGroupInformation currentUserUgi
             = UserGroupInformation.createRemoteUser(currentUser);
     currentUserUgi.setAuthenticationMethod(KERBEROS);
@@ -83,7 +83,7 @@ public class TestDFSClientCache {
 
     UserGroupInformation currentUserUgi = UserGroupInformation
             .createUserForTesting(currentUser, new String[0]);
-    Configuration conf = new Configuration();
+    NfsConfiguration conf = new NfsConfiguration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
     DFSClientCache cache = new DFSClientCache(conf);
     UserGroupInformation ugiResult

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java Tue Aug 19 23:49:39 2014
@@ -21,22 +21,22 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.mount.Mountd;
 import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
-import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
 import org.junit.Test;
 
 public class TestExportsTable {
  
   @Test
   public void testExportPoint() throws IOException {
-    Configuration config = new Configuration();
+    NfsConfiguration config = new NfsConfiguration();
     MiniDFSCluster cluster = null;
 
     String exportPoint = "/myexport1";
-    config.setStrings(Nfs3Constant.EXPORT_POINT, exportPoint);
+    config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint);
     // Use emphral port in case tests are running in parallel
     config.setInt("nfs3.mountd.port", 0);
     config.setInt("nfs3.server.port", 0);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java Tue Aug 19 23:49:39 2014
@@ -22,13 +22,13 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
-import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -38,10 +38,10 @@ public class TestOpenFileCtxCache {
 
   @Test
   public void testEviction() throws IOException, InterruptedException {
-    Configuration conf = new Configuration();
+    NfsConfiguration conf = new NfsConfiguration();
 
     // Only two entries will be in the cache
-    conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
+    conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);
 
     DFSClient dfsClient = Mockito.mock(DFSClient.class);
     Nfs3FileAttributes attr = new Nfs3FileAttributes();
@@ -49,15 +49,15 @@ public class TestOpenFileCtxCache {
     Mockito.when(fos.getPos()).thenReturn((long) 0);
 
     OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
 
     OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
 
@@ -71,7 +71,7 @@ public class TestOpenFileCtxCache {
     assertTrue(cache.size() == 2);
 
     // Wait for the oldest stream to be evict-able, insert again
-    Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
     assertTrue(cache.size() == 2);
 
     ret = cache.put(new FileHandle(3), context3);
@@ -90,17 +90,17 @@ public class TestOpenFileCtxCache {
         new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
     context4.getPendingCommitsForTest().put(new Long(100),
         new CommitCtx(0, null, 0, attr));
-    Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
     ret = cache.put(new FileHandle(5), context5);
     assertFalse(ret);
   }
 
   @Test
   public void testScan() throws IOException, InterruptedException {
-    Configuration conf = new Configuration();
+    NfsConfiguration conf = new NfsConfiguration();
 
     // Only two entries will be in the cache
-    conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
+    conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);
 
     DFSClient dfsClient = Mockito.mock(DFSClient.class);
     Nfs3FileAttributes attr = new Nfs3FileAttributes();
@@ -108,13 +108,13 @@ public class TestOpenFileCtxCache {
     Mockito.when(fos.getPos()).thenReturn((long) 0);
 
     OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
     OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
-        dfsClient, new IdUserGroup());
+        dfsClient, new IdUserGroup(new NfsConfiguration()));
 
     OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
 
@@ -123,8 +123,8 @@ public class TestOpenFileCtxCache {
     assertTrue(ret);
     ret = cache.put(new FileHandle(2), context2);
     assertTrue(ret);
-    Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1);
-    cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1);
+    cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
     assertTrue(cache.size() == 0);
 
     // Test cleaning inactive entry
@@ -133,7 +133,7 @@ public class TestOpenFileCtxCache {
     ret = cache.put(new FileHandle(4), context4);
     assertTrue(ret);
     context3.setActiveStatusForTest(false);
-    cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
+    cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
     assertTrue(cache.size() == 1);
     assertTrue(cache.get(new FileHandle(3)) == null);
     assertTrue(cache.get(new FileHandle(4)) != null);