You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2014/08/04 19:40:03 UTC

svn commit: r1615702 - in /hadoop/common/trunk/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/

Author: brandonli
Date: Mon Aug  4 17:40:02 2014
New Revision: 1615702

URL: http://svn.apache.org/r1615702
Log:
HDFS-6451. NFS should not return NFS3ERR_IO for AccessControlException. Contributed by Abhiraj Butala

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1615702&r1=1615701&r2=1615702&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Mon Aug  4 17:40:02 2014
@@ -140,7 +140,7 @@ public class RpcProgramNfs3 extends RpcP
   public static final int DEFAULT_UMASK = 0022;
   public static final FsPermission umask = new FsPermission(
       (short) DEFAULT_UMASK);
-  
+
   static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
 
   private final NfsConfiguration config;
@@ -149,14 +149,14 @@ public class RpcProgramNfs3 extends RpcP
   private final DFSClientCache clientCache;
 
   private final NfsExports exports;
-  
+
   private final short replication;
   private final long blockSize;
   private final int bufferSize;
   private final boolean aixCompatMode;
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
-  
+
   private final RpcCallCache rpcCallCache;
 
   public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
@@ -166,11 +166,11 @@ public class RpcProgramNfs3 extends RpcP
         NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
         Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
         allowInsecurePorts);
-   
+
     this.config = config;
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup(config);
-    
+
     aixCompatMode = config.getBoolean(
         NfsConfigKeys.AIX_COMPAT_MODE_KEY,
         NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
@@ -184,7 +184,7 @@ public class RpcProgramNfs3 extends RpcP
     bufferSize = config.getInt(
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-    
+
     writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
         NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
     boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
@@ -216,12 +216,23 @@ public class RpcProgramNfs3 extends RpcP
       throw new IOException("Cannot create dump directory " + dumpDir);
     }
   }
-  
+
   @Override
   public void startDaemons() {
      writeManager.startAsyncDataSerivce();
   }
-  
+
+  // Checks the type of IOException and maps it to appropriate Nfs3Status code.
+  private int mapErrorStatus(IOException e) {
+    if (e instanceof FileNotFoundException) {
+      return Nfs3Status.NFS3ERR_STALE;
+    } else if (e instanceof AccessControlException) {
+      return Nfs3Status.NFS3ERR_ACCES;
+    } else {
+      return Nfs3Status.NFS3ERR_IO;
+    }
+  }
+
   /******************************************************
    * RPC call handlers
    ******************************************************/
@@ -236,20 +247,25 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
+    return getattr(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     GETATTR3Request request = null;
     try {
       request = new GETATTR3Request(xdr);
@@ -280,7 +296,8 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.info("Can't get file attribute, fileId=" + handle.getFileId(), e);
-      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      response.setStatus(status);
       return response;
     }
     if (attrs == null) {
@@ -297,7 +314,7 @@ public class RpcProgramNfs3 extends RpcP
   private void setattrInternal(DFSClient dfsClient, String fileIdPath,
       SetAttr3 newAttr, boolean setMode) throws IOException {
     EnumSet<SetAttrField> updateFields = newAttr.getUpdateFields();
-    
+
     if (setMode && updateFields.contains(SetAttrField.MODE)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("set new mode:" + newAttr.getMode());
@@ -328,14 +345,19 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
+    return setattr(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     SETATTR3Request request = null;
     try {
       request = new SETATTR3Request(xdr);
@@ -373,9 +395,9 @@ public class RpcProgramNfs3 extends RpcP
           return new SETATTR3Response(Nfs3Status.NFS3ERR_NOT_SYNC, wccData);
         }
       }
-      
+
       // check the write access privilege
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             preOpWcc, preOpAttr));
       }
@@ -394,30 +416,33 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileIdPath: " + fileIdPath, e1);
       }
-      if (e instanceof AccessControlException) {
-        return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, wccData);
-      } else {
-        return new SETATTR3Response(Nfs3Status.NFS3ERR_IO, wccData);
-      }
+
+      int status = mapErrorStatus(e);
+      return new SETATTR3Response(status, wccData);
     }
   }
 
   @Override
   public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
+    return lookup(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     LOOKUP3Request request = null;
     try {
       request = new LOOKUP3Request(xdr);
@@ -460,26 +485,32 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new LOOKUP3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new LOOKUP3Response(status);
     }
   }
-  
+
   @Override
   public ACCESS3Response access(XDR xdr, RpcInfo info) {
+    return access(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     ACCESS3Request request = null;
     try {
       request = new ACCESS3Request(xdr);
@@ -493,7 +524,7 @@ public class RpcProgramNfs3 extends RpcP
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS ACCESS fileId: " + handle.getFileId());
-    } 
+    }
 
     try {
       // HDFS-5804 removed supserUserClient access
@@ -506,7 +537,7 @@ public class RpcProgramNfs3 extends RpcP
       int access = Nfs3Utils.getAccessRightsForUserGroup(
           securityHandler.getUid(), securityHandler.getGid(),
           securityHandler.getAuxGids(), attrs);
-      
+
       return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
     } catch (RemoteException r) {
       LOG.warn("Exception ", r);
@@ -521,20 +552,26 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new ACCESS3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new ACCESS3Response(status);
     }
   }
 
   @Override
   public READLINK3Response readlink(XDR xdr, RpcInfo info) {
+    return readlink(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -588,39 +625,33 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Readlink error: " + e.getClass(), e);
-      if (e instanceof FileNotFoundException) {
-        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
-      } else if (e instanceof AccessControlException) {
-        return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
-      }
-      return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READLINK3Response(status);
     }
   }
 
   @Override
   public READ3Response read(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return read(xdr, securityHandler, remoteAddress);
+    return read(xdr, getSecurityHandler(info), info.remoteAddress());
   }
-  
+
   @VisibleForTesting
   READ3Response read(XDR xdr, SecurityHandler securityHandler,
       SocketAddress remoteAddress) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     final String userName = securityHandler.getUser();
-    
+
     if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(userName);
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     READ3Request request = null;
 
     try {
@@ -670,7 +701,7 @@ public class RpcProgramNfs3 extends RpcP
         return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
       }
     }
-    
+
     // In case there is buffered data for the same file, flush it. This can be
     // optimized later by reading from the cache.
     int ret = writeManager.commitBeforeRead(dfsClient, handle, offset + count);
@@ -725,7 +756,8 @@ public class RpcProgramNfs3 extends RpcP
     } catch (IOException e) {
       LOG.warn("Read error: " + e.getClass() + " offset: " + offset
           + " count: " + count, e);
-      return new READ3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READ3Response(status);
     }
   }
 
@@ -737,7 +769,7 @@ public class RpcProgramNfs3 extends RpcP
     SocketAddress remoteAddress = info.remoteAddress();
     return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
   }
-  
+
   @VisibleForTesting
   WRITE3Response write(XDR xdr, Channel channel, int xid,
       SecurityHandler securityHandler, SocketAddress remoteAddress) {
@@ -748,7 +780,7 @@ public class RpcProgramNfs3 extends RpcP
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     WRITE3Request request = null;
 
     try {
@@ -781,13 +813,13 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for fileId:" + handle.getFileId());
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Constant.WRITE_COMMIT_VERF);
       }
-      
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("requesed offset=" + offset + " and current filesize="
             + preOpAttr.getSize());
@@ -807,8 +839,10 @@ public class RpcProgramNfs3 extends RpcP
       }
       WccAttr attr = preOpAttr == null ? null : Nfs3Utils.getWccAttr(preOpAttr);
       WccData fileWcc = new WccData(attr, postOpAttr);
-      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0,
-          request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+
+      int status = mapErrorStatus(e);
+      return new WRITE3Response(status, fileWcc, 0, request.getStableHow(),
+          Nfs3Constant.WRITE_COMMIT_VERF);
     }
 
     return null;
@@ -816,11 +850,9 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public CREATE3Response create(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return create(xdr, securityHandler, remoteAddress);
+    return create(xdr, getSecurityHandler(info), info.remoteAddress());
   }
-  
+
   @VisibleForTesting
   CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
       SocketAddress remoteAddress) {
@@ -830,7 +862,7 @@ public class RpcProgramNfs3 extends RpcP
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     CREATE3Request request = null;
 
     try {
@@ -868,7 +900,7 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for dirHandle:" + dirHandle);
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
@@ -881,15 +913,15 @@ public class RpcProgramNfs3 extends RpcP
       FsPermission permission = setAttr3.getUpdateFields().contains(
           SetAttrField.MODE) ? new FsPermission((short) setAttr3.getMode())
           : FsPermission.getDefault().applyUMask(umask);
-          
+
       EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ? 
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : 
           EnumSet.of(CreateFlag.CREATE);
-      
+
       fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
           flag, false, replication, blockSize, null, bufferSize, null),
           statistics);
-      
+
       if ((createMode == Nfs3Constant.CREATE_UNCHECKED)
           || (createMode == Nfs3Constant.CREATE_GUARDED)) {
         // Set group if it's not specified in the request.
@@ -903,7 +935,7 @@ public class RpcProgramNfs3 extends RpcP
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           dfsClient, dirFileIdPath, iug);
-      
+
       // Add open stream
       OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
           writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
@@ -920,7 +952,7 @@ public class RpcProgramNfs3 extends RpcP
               + fileHandle.getFileId());
         }
       }
-      
+
     } catch (IOException e) {
       LOG.error("Exception", e);
       if (fos != null) {
@@ -940,29 +972,30 @@ public class RpcProgramNfs3 extends RpcP
               + dirHandle.getFileId(), e1);
         }
       }
-      if (e instanceof AccessControlException) {
-        return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, fileHandle,
-            postOpObjAttr, dirWcc);
-      } else {
-        return new CREATE3Response(Nfs3Status.NFS3ERR_IO, fileHandle,
-            postOpObjAttr, dirWcc);
-      }
+
+      int status = mapErrorStatus(e);
+      return new CREATE3Response(status, fileHandle, postOpObjAttr, dirWcc);
     }
-    
+
     return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
         dirWcc);
   }
 
   @Override
   public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
+    return mkdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     MKDIR3Request request = null;
 
     try {
@@ -992,11 +1025,11 @@ public class RpcProgramNfs3 extends RpcP
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
       }
-      
+
       final String fileIdPath = dirFileIdPath + "/" + fileName;
       SetAttr3 setAttr3 = request.getObjAttr();
       FsPermission permission = setAttr3.getUpdateFields().contains(
@@ -1015,7 +1048,7 @@ public class RpcProgramNfs3 extends RpcP
         setAttr3.setGid(securityHandler.getGid());
       }
       setattrInternal(dfsClient, fileIdPath, setAttr3, false);
-      
+
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       objFileHandle = new FileHandle(postOpObjAttr.getFileId());
       WccData dirWcc = Nfs3Utils.createWccData(
@@ -1032,15 +1065,11 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new MKDIR3Response(Nfs3Status.NFS3ERR_PERM, objFileHandle,
-            postOpObjAttr, dirWcc);
-      } else {
-        return new MKDIR3Response(Nfs3Status.NFS3ERR_IO, objFileHandle,
-            postOpObjAttr, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new MKDIR3Response(status, objFileHandle, postOpObjAttr, dirWcc);
     }
   }
 
@@ -1048,21 +1077,22 @@ public class RpcProgramNfs3 extends RpcP
   public READDIR3Response mknod(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
-  
+
   @Override
   public REMOVE3Response remove(XDR xdr, RpcInfo info) {
     return remove(xdr, getSecurityHandler(info), info.remoteAddress());
   }
- 
+
   @VisibleForTesting
-  REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) {
+  REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     REMOVE3Request request = null;
     try {
       request = new REMOVE3Request(xdr);
@@ -1120,26 +1150,29 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
-      } else {
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new REMOVE3Response(status, dirWcc);
     }
   }
 
   @Override
   public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
+    return rmdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     RMDIR3Request request = null;
     try {
       request = new RMDIR3Request(xdr);
@@ -1164,10 +1197,10 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
         return new RMDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           preOpDirAttr);
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
       }
 
@@ -1179,7 +1212,7 @@ public class RpcProgramNfs3 extends RpcP
       if (!fstat.isDir()) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, errWcc);
       }
-      
+
       if (fstat.getChildrenNum() > 0) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, errWcc);
       }
@@ -1202,26 +1235,29 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
-      } else {
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new RMDIR3Response(status, dirWcc);
     }
   }
 
   @Override
   public RENAME3Response rename(XDR xdr, RpcInfo info) {
+    return rename(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     RENAME3Request request = null;
     try {
       request = new RENAME3Request(xdr);
@@ -1258,8 +1294,8 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for toHandle fileId:" + toHandle.getFileId());
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
             fromPreOpAttr);
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1280,7 +1316,7 @@ public class RpcProgramNfs3 extends RpcP
       return new RENAME3Response(Nfs3Status.NFS3_OK, fromDirWcc, toDirWcc);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      // Try to return correct WccData      
+      // Try to return correct WccData
       try {
         fromDirWcc = Nfs3Utils.createWccData(
             Nfs3Utils.getWccAttr(fromPreOpAttr), dfsClient, fromDirFileIdPath,
@@ -1291,25 +1327,27 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get postOpDirAttr for " + fromDirFileIdPath + " or"
             + toDirFileIdPath, e1);
       }
-      if (e instanceof AccessControlException) {
-        return new RENAME3Response(Nfs3Status.NFS3ERR_PERM, fromDirWcc,
-            toDirWcc);
-      } else {
-        return new RENAME3Response(Nfs3Status.NFS3ERR_IO, fromDirWcc, toDirWcc);
-      }
+
+      int status = mapErrorStatus(e);
+      return new RENAME3Response(status, fromDirWcc, toDirWcc);
     }
   }
 
   @Override
   public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
+    return symlink(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1355,7 +1393,8 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Exception:" + e);
-      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      response.setStatus(status);
       return response;
     }
   }
@@ -1387,28 +1426,27 @@ public class RpcProgramNfs3 extends RpcP
     }
     return dlisting;
   }
-  
+
   @Override
   public READDIR3Response readdir(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return readdir(xdr, securityHandler, remoteAddress);
+    return readdir(xdr, getSecurityHandler(info), info.remoteAddress());
   }
+
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
       SocketAddress remoteAddress) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
-    
+
     if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     READDIR3Request request = null;
     try {
       request = new READDIR3Request(xdr);
@@ -1427,7 +1465,7 @@ public class RpcProgramNfs3 extends RpcP
       LOG.info("Nonpositive count in invalid READDIR request:" + count);
       return new READDIR3Response(Nfs3Status.NFS3_OK);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: "
           + cookie + " count: " + count);
@@ -1492,7 +1530,7 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      
+
       dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpAttr == null) {
@@ -1501,21 +1539,22 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new READDIR3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READDIR3Response(status);
     }
 
     /**
      * Set up the dirents in the response. fileId is used as the cookie with one
      * exception. Linux client can either be stuck with "ls" command (on REHL)
      * or report "Too many levels of symbolic links" (Ubuntu).
-     * 
+     *
      * The problem is that, only two items returned, "." and ".." when the
      * namespace is empty. Both of them are "/" with the same cookie(root
      * fileId). Linux client doesn't think such a directory is a real directory.
      * Even though NFS protocol specifies cookie is an opaque data, Linux client
      * somehow doesn't like an empty dir returns same cookie for both "." and
      * "..".
-     * 
+     *
      * The workaround is to use 0 as the cookie for "." and always return "." as
      * the first entry in readdir/readdirplus response.
      */
@@ -1523,7 +1562,7 @@ public class RpcProgramNfs3 extends RpcP
     int n = (int) Math.min(fstatus.length, count-2);
     boolean eof = (n < fstatus.length) ? false : (dlisting
         .getRemainingEntries() == 0);
-    
+
     Entry3[] entries;
     if (cookie == 0) {
       entries = new Entry3[n + 2];
@@ -1543,7 +1582,7 @@ public class RpcProgramNfs3 extends RpcP
             fstatus[i].getLocalName(), fstatus[i].getFileId());
       }
     }
-    
+
     DirList3 dirList = new READDIR3Response.DirList3(entries, eof);
     return new READDIR3Response(Nfs3Status.NFS3_OK, postOpAttr,
         dirStatus.getModificationTime(), dirList);
@@ -1551,9 +1590,7 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return readdirplus(xdr, securityHandler, remoteAddress);
+    return readdirplus(xdr, getSecurityHandler(info), info.remoteAddress());
   }
 
   @VisibleForTesting
@@ -1562,12 +1599,12 @@ public class RpcProgramNfs3 extends RpcP
     if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
     }
-    
+
     READDIRPLUS3Request request = null;
     try {
       request = new READDIRPLUS3Request(xdr);
@@ -1592,7 +1629,7 @@ public class RpcProgramNfs3 extends RpcP
       LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
           + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1655,7 +1692,7 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      
+
       dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpDirAttr == null) {
@@ -1664,19 +1701,20 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READDIRPLUS3Response(status);
     }
-    
+
     // Set up the dirents in the response
     HdfsFileStatus[] fstatus = dlisting.getPartialListing();
     int n = (int) Math.min(fstatus.length, dirCount-2);
     boolean eof = (n < fstatus.length) ? false : (dlisting
         .getRemainingEntries() == 0);
-    
+
     READDIRPLUS3Response.EntryPlus3[] entries;
     if (cookie == 0) {
       entries = new READDIRPLUS3Response.EntryPlus3[n+2];
-      
+
       entries[0] = new READDIRPLUS3Response.EntryPlus3(
           postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle(
               postOpDirAttr.getFileId()));
@@ -1720,23 +1758,28 @@ public class RpcProgramNfs3 extends RpcP
     return new READDIRPLUS3Response(Nfs3Status.NFS3_OK, postOpDirAttr,
         dirStatus.getModificationTime(), dirListPlus);
   }
-  
+
   @Override
   public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
+    return fsstat(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     FSSTAT3Request request = null;
     try {
       request = new FSSTAT3Request(xdr);
@@ -1754,14 +1797,14 @@ public class RpcProgramNfs3 extends RpcP
       FsStatus fsStatus = dfsClient.getDiskStatus();
       long totalBytes = fsStatus.getCapacity();
       long freeBytes = fsStatus.getRemaining();
-      
+
       Nfs3FileAttributes attrs = writeManager.getFileAttr(dfsClient, handle,
           iug);
       if (attrs == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new FSSTAT3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       long maxFsObjects = config.getLong("dfs.max.objects", 0);
       if (maxFsObjects == 0) {
         // A value of zero in HDFS indicates no limit to the number
@@ -1769,7 +1812,7 @@ public class RpcProgramNfs3 extends RpcP
         // Long.MAX_VALUE so 32bit client won't complain.
         maxFsObjects = Integer.MAX_VALUE;
       }
-      
+
       return new FSSTAT3Response(Nfs3Status.NFS3_OK, attrs, totalBytes,
           freeBytes, freeBytes, maxFsObjects, maxFsObjects, maxFsObjects, 0);
     } catch (RemoteException r) {
@@ -1785,26 +1828,32 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new FSSTAT3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new FSSTAT3Response(status);
     }
   }
 
   @Override
   public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
+    return fsinfo(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     FSINFO3Request request = null;
     try {
       request = new FSINFO3Request(xdr);
@@ -1835,7 +1884,7 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new FSINFO3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       int fsProperty = Nfs3Constant.FSF3_CANSETTIME
           | Nfs3Constant.FSF3_HOMOGENEOUS;
 
@@ -1843,26 +1892,32 @@ public class RpcProgramNfs3 extends RpcP
           wtmax, wtmax, 1, dtperf, Long.MAX_VALUE, new NfsTime(1), fsProperty);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new FSINFO3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new FSINFO3Response(status);
     }
   }
 
   @Override
   public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
+    return pathconf(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     PATHCONF3Request request = null;
     try {
       request = new PATHCONF3Request(xdr);
@@ -1890,22 +1945,30 @@ public class RpcProgramNfs3 extends RpcP
           HdfsConstants.MAX_PATH_LENGTH, true, false, false, true);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new PATHCONF3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new PATHCONF3Response(status);
     }
   }
 
   @Override
   public COMMIT3Response commit(XDR xdr, RpcInfo info) {
-    //Channel channel, int xid,
-    //    SecurityHandler securityHandler, InetAddress client) {
-    COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return commit(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  COMMIT3Response commit(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
+    COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     COMMIT3Request request = null;
     try {
       request = new COMMIT3Request(xdr);
@@ -1929,21 +1992,19 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Constant.WRITE_COMMIT_VERF);
       }
-      
+
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
-      
+
       // Insert commit as an async request
-      RpcCall rpcCall = (RpcCall) info.header();
-      int xid = rpcCall.getXid();
       writeManager.handleCommit(dfsClient, handle, commitOffset,
-          info.channel(), xid, preOpAttr);
+          channel, xid, preOpAttr);
       return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
@@ -1953,9 +2014,11 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileId: " + handle.getFileId(), e1);
       }
+
       WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
-      return new COMMIT3Response(Nfs3Status.NFS3ERR_IO, fileWcc,
-          Nfs3Constant.WRITE_COMMIT_VERF);
+      int status = mapErrorStatus(e);
+      return new COMMIT3Response(status, fileWcc,
+        Nfs3Constant.WRITE_COMMIT_VERF);
     }
   }
 
@@ -1973,7 +2036,7 @@ public class RpcProgramNfs3 extends RpcP
     RpcCall rpcCall = (RpcCall) info.header();
     return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
   }
-  
+
   @Override
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     RpcCall rpcCall = (RpcCall) info.header();
@@ -1986,7 +2049,7 @@ public class RpcProgramNfs3 extends RpcP
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
         .getAddress();
     Credentials credentials = rpcCall.getCredential();
-    
+
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -2023,7 +2086,7 @@ public class RpcProgramNfs3 extends RpcP
         }
       }
     }
-    
+
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
@@ -2040,7 +2103,7 @@ public class RpcProgramNfs3 extends RpcP
     } else if (nfsproc3 == NFSPROC3.READ) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
-      }    
+      }
       response = read(xdr, info);
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
@@ -2053,7 +2116,7 @@ public class RpcProgramNfs3 extends RpcP
       // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
       response = create(xdr, info);
-    } else if (nfsproc3 == NFSPROC3.MKDIR) {      
+    } else if (nfsproc3 == NFSPROC3.MKDIR) {
       response = mkdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
       response = symlink(xdr, info);
@@ -2104,18 +2167,12 @@ public class RpcProgramNfs3 extends RpcP
 
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
-  
+
   @Override
   protected boolean isIdempotent(RpcCall call) {
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure()); 
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure());
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
-  
-  private boolean checkAccessPrivilege(RpcInfo info,
-      final AccessPrivilege expected) {
-    SocketAddress remoteAddress = info.remoteAddress();
-    return checkAccessPrivilege(remoteAddress, expected);
-  }
 
   private boolean checkAccessPrivilege(SocketAddress remoteAddress,
       final AccessPrivilege expected) {
@@ -2139,7 +2196,7 @@ public class RpcProgramNfs3 extends RpcP
     }
     return true;
   }
-  
+
   @VisibleForTesting
   WriteManager getWriteManager() {
     return this.writeManager;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java?rev=1615702&r1=1615701&r2=1615702&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java Mon Aug  4 17:40:02 2014
@@ -18,19 +18,603 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import org.jboss.netty.channel.Channel;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
+import org.apache.hadoop.nfs.nfs3.request.LOOKUP3Request;
+import org.apache.hadoop.nfs.nfs3.request.READ3Request;
+import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.ACCESS3Response;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
+import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
+import org.apache.hadoop.nfs.nfs3.response.FSSTAT3Response;
+import org.apache.hadoop.nfs.nfs3.response.FSINFO3Response;
+import org.apache.hadoop.nfs.nfs3.response.GETATTR3Response;
+import org.apache.hadoop.nfs.nfs3.response.LOOKUP3Response;
+import org.apache.hadoop.nfs.nfs3.response.PATHCONF3Response;
+import org.apache.hadoop.nfs.nfs3.response.READ3Response;
+import org.apache.hadoop.nfs.nfs3.response.REMOVE3Response;
+import org.apache.hadoop.nfs.nfs3.response.RMDIR3Response;
+import org.apache.hadoop.nfs.nfs3.response.RENAME3Response;
+import org.apache.hadoop.nfs.nfs3.response.READDIR3Response;
+import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
+import org.apache.hadoop.nfs.nfs3.response.READLINK3Response;
+import org.apache.hadoop.nfs.nfs3.response.SETATTR3Response;
+import org.apache.hadoop.nfs.nfs3.response.SYMLINK3Response;
+import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
+import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 
 
 /**
  * Tests for {@link RpcProgramNfs3}
  */
 public class TestRpcProgramNfs3 {
+  static DistributedFileSystem hdfs;
+  static MiniDFSCluster cluster = null;
+  static NfsConfiguration config = new NfsConfiguration();
+  static NameNode nn;
+  static Nfs3 nfs;
+  static RpcProgramNfs3 nfsd;
+  static SecurityHandler securityHandler;
+  static SecurityHandler securityHandlerUnpriviledged;
+  static String testdir = "/tmp";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    String currentUser = System.getProperty("user.name");
+
+    config.set("fs.permissions.umask-mode", "u=rwx,g=,o=");
+    config.set(DefaultImpersonationProvider.getTestProvider()
+        .getProxySuperuserGroupConfKey(currentUser), "*");
+    config.set(DefaultImpersonationProvider.getTestProvider()
+        .getProxySuperuserIpConfKey(currentUser), "*");
+    ProxyUsers.refreshSuperUserGroupsConfiguration(config);
+
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+    cluster.waitActive();
+    hdfs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+
+    // Use ephemeral ports in case tests are running in parallel
+    config.setInt("nfs3.mountd.port", 0);
+    config.setInt("nfs3.server.port", 0);
+
+    // Start NFS with allowed.hosts set to "* rw"
+    config.set("dfs.nfs.exports.allowed.hosts", "* rw");
+    nfs = new Nfs3(config);
+    nfs.startServiceInternal(false);
+    nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
+
+
+    // Mock SecurityHandler which returns system user.name
+    securityHandler = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandler.getUser()).thenReturn(currentUser);
+
+    // Mock SecurityHandler which returns a dummy username "harry"
+    securityHandlerUnpriviledged = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandlerUnpriviledged.getUser()).thenReturn("harry");
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void createFiles() throws IllegalArgumentException, IOException {
+    hdfs.delete(new Path(testdir), true);
+    hdfs.mkdirs(new Path(testdir));
+    hdfs.mkdirs(new Path(testdir + "/foo"));
+    DFSTestUtil.createFile(hdfs, new Path(testdir + "/bar"), 0, (short) 1, 0);
+  }
+
+  @Test(timeout = 60000)
+  public void testGetattr() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    GETATTR3Response response1 = nfsd.getattr(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    GETATTR3Response response2 = nfsd.getattr(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testSetattr() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("bar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeBoolean(false);
+
+    // Attempt by an unpriviledged user should fail.
+    SETATTR3Response response1 = nfsd.setattr(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    SETATTR3Response response2 = nfsd.setattr(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testLookup() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    LOOKUP3Request lookupReq = new LOOKUP3Request(handle, "bar");
+    XDR xdr_req = new XDR();
+    lookupReq.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    LOOKUP3Response response1 = nfsd.lookup(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    LOOKUP3Response response2 = nfsd.lookup(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testAccess() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    ACCESS3Response response1 = nfsd.access(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    ACCESS3Response response2 = nfsd.access(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testReadlink() throws Exception {
+    // Create a symlink first.
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    SYMLINK3Response response = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response.getStatus());
+
+    // Now perform readlink operations.
+    FileHandle handle2 = response.getObjFileHandle();
+    XDR xdr_req2 = new XDR();
+    handle2.serialize(xdr_req2);
+
+    // Attempt by an unpriviledged user should fail.
+    READLINK3Response response1 = nfsd.readlink(xdr_req2.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READLINK3Response response2 = nfsd.readlink(xdr_req2.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRead() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+
+    READ3Request readReq = new READ3Request(handle, 0, 5);
+    XDR xdr_req = new XDR();
+    readReq.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    /* Hits HDFS-6582. It needs to be fixed first.
+    READ3Response response1 = nfsd.read(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+    */
+
+    // Attempt by a priviledged user should pass.
+    READ3Response response2 = nfsd.read(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testWrite() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+
+    byte[] buffer = new byte[10];
+    for (int i = 0; i < 10; i++) {
+      buffer[i] = (byte) i;
+    }
+
+    WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
+        WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
+    XDR xdr_req = new XDR();
+    writeReq.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    WRITE3Response response1 = nfsd.write(xdr_req.asReadOnlyWrap(),
+        null, 1, securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    WRITE3Response response2 = nfsd.write(xdr_req.asReadOnlyWrap(),
+        null, 1, securityHandler,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect response:", null, response2);
+  }
+
+  @Test(timeout = 60000)
+  public void testCreate() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    xdr_req.writeInt(Nfs3Constant.CREATE_UNCHECKED);
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    CREATE3Response response1 = nfsd.create(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    CREATE3Response response2 = nfsd.create(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testMkdir() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    // Attempt to remove by an unpriviledged user should fail.
+    SYMLINK3Response response1 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt to remove by a priviledged user should pass.
+    SYMLINK3Response response2 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testSymlink() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    // Attempt by an unpriviledged user should fail.
+    SYMLINK3Response response1 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    SYMLINK3Response response2 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRemove() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    // Attempt by an unpriviledged user should fail.
+    REMOVE3Response response1 = nfsd.remove(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    REMOVE3Response response2 = nfsd.remove(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRmdir() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("foo");
+
+    // Attempt by an unpriviledged user should fail.
+    RMDIR3Response response1 = nfsd.rmdir(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    RMDIR3Response response2 = nfsd.rmdir(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRename() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("bar");
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+
+    // Attempt by an unpriviledged user should fail.
+    RENAME3Response response1 = nfsd.rename(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    RENAME3Response response2 = nfsd.rename(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testReaddir() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeInt(100);
+
+    // Attempt by an unpriviledged user should fail.
+    READDIR3Response response1 = nfsd.readdir(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READDIR3Response response2 = nfsd.readdir(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testReaddirplus() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeInt(3);
+    xdr_req.writeInt(2);
+
+    // Attempt by an unpriviledged user should fail.
+    READDIRPLUS3Response response1 = nfsd.readdirplus(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READDIRPLUS3Response response2 = nfsd.readdirplus(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testFsstat() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    FSSTAT3Response response1 = nfsd.fsstat(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    FSSTAT3Response response2 = nfsd.fsstat(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testFsinfo() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    FSINFO3Response response1 = nfsd.fsinfo(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    FSINFO3Response response2 = nfsd.fsinfo(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testPathconf() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    PATHCONF3Response response1 = nfsd.pathconf(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    PATHCONF3Response response2 = nfsd.pathconf(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testCommit() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeInt(5);
+
+    Channel ch = Mockito.mock(Channel.class);
+
+    // Attempt by an unpriviledged user should fail.
+    COMMIT3Response response1 = nfsd.commit(xdr_req.asReadOnlyWrap(),
+        ch, 1, securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    COMMIT3Response response2 = nfsd.commit(xdr_req.asReadOnlyWrap(),
+        ch, 1, securityHandler,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect COMMIT3Response:", null, response2);
+  }
+
   @Test(timeout=1000)
   public void testIdempotent() {
     Object[][] procedures = {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615702&r1=1615701&r2=1615702&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Aug  4 17:40:02 2014
@@ -437,6 +437,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-5185. DN fails to startup if one of the data dir is full. (vinayakumarb)
 
+    HDFS-6451. NFS should not return NFS3ERR_IO for AccessControlException 
+    (Abhiraj Butala via brandonli)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES