You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/08/23 23:14:43 UTC

svn commit: r1517040 - in /hadoop/common/trunk/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/...

Author: brandonli
Date: Fri Aug 23 21:14:43 2013
New Revision: 1517040

URL: http://svn.apache.org/r1517040
Log:
HDFS-4947 Add NFS server export table to control export by hostname or IP range. Contributed by Jing Zhao

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/AccessPrivilege.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/NfsExports.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/security/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/security/TestNfsExports.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1517040&r1=1517039&r2=1517040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Fri Aug 23 21:14:43 2013
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
+import org.apache.hadoop.hdfs.nfs.security.NfsExports;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.mount.MountEntry;
@@ -59,6 +61,8 @@ public class RpcProgramMountd extends Rp
   
   /** List that is unmodifiable */
   private final List<String> exports;
+  
+  private final NfsExports hostsMatcher;
 
   public RpcProgramMountd() throws IOException {
     this(new ArrayList<String>(0));
@@ -72,19 +76,29 @@ public class RpcProgramMountd extends Rp
       throws IOException {
     // Note that RPC cache is not enabled
     super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
+    
+    this.hostsMatcher = NfsExports.getInstance(config);
     this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
     this.exports = Collections.unmodifiableList(exports);
     this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
   }
   
+  @Override
   public XDR nullOp(XDR out, int xid, InetAddress client) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("MOUNT NULLOP : " + " client: " + client);
     }
-    return  RpcAcceptedReply.voidReply(out, xid);
+    return RpcAcceptedReply.voidReply(out, xid);
   }
 
+  @Override
   public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
+    AccessPrivilege accessPrivilege = hostsMatcher.getAccessPrivilege(client);
+    if (accessPrivilege == AccessPrivilege.NONE) {
+      return MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out, xid,
+          null);
+    }
+
     String path = xdr.readString();
     if (LOG.isDebugEnabled()) {
       LOG.debug("MOUNT MNT path: " + path + " client: " + client);
@@ -121,6 +135,7 @@ public class RpcProgramMountd extends Rp
     return out;
   }
 
+  @Override
   public XDR dump(XDR out, int xid, InetAddress client) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("MOUNT NULLOP : " + " client: " + client);
@@ -131,6 +146,7 @@ public class RpcProgramMountd extends Rp
     return out;
   }
 
+  @Override
   public XDR umnt(XDR xdr, XDR out, int xid, InetAddress client) {
     String path = xdr.readString();
     if (LOG.isDebugEnabled()) {
@@ -143,6 +159,7 @@ public class RpcProgramMountd extends Rp
     return out;
   }
 
+  @Override
   public XDR umntall(XDR out, int xid, InetAddress client) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("MOUNT UMNTALL : " + " client: " + client);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1517040&r1=1517039&r2=1517040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Fri Aug 23 21:14:43 2013
@@ -32,12 +32,17 @@ import org.apache.hadoop.util.StringUtil
  * Only TCP server is supported and UDP is not supported.
  */
 public class Nfs3 extends Nfs3Base {
+  static {
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
+  
   public Nfs3(List<String> exports) throws IOException {
-    super(new Mountd(exports), new RpcProgramNfs3(exports));
+    super(new Mountd(exports), new RpcProgramNfs3());
   }
 
   public Nfs3(List<String> exports, Configuration config) throws IOException {
-    super(new Mountd(exports, config), new RpcProgramNfs3(exports, config));
+    super(new Mountd(exports, config), new RpcProgramNfs3(config));
   }
 
   public static void main(String[] args) throws IOException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1517040&r1=1517039&r2=1517040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Fri Aug 23 21:14:43 2013
@@ -88,6 +88,7 @@ public class Nfs3Utils {
     return new WccAttr(attr.getSize(), attr.getMtime(), attr.getCtime());
   }
 
+  // TODO: maybe not efficient
   public static WccData createWccData(final WccAttr preOpAttr,
       DFSClient dfsClient, final String fileIdPath, final IdUserGroup iug)
       throws IOException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1517040&r1=1517039&r2=1517040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Fri Aug 23 21:14:43 2013
@@ -22,22 +22,23 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
+import org.apache.hadoop.hdfs.nfs.security.NfsExports;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -125,6 +126,8 @@ public class RpcProgramNfs3 extends RpcP
   private final IdUserGroup iug;// = new IdUserGroup();
   private final DFSClientCache clientCache;
 
+  private final NfsExports exports;
+  
   /**
    * superUserClient should always impersonate HDFS file system owner to send
    * requests which requires supergroup privilege. This requires the same user
@@ -138,17 +141,19 @@ public class RpcProgramNfs3 extends RpcP
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
-  public RpcProgramNfs3(List<String> exports) throws IOException {
-    this(exports, new Configuration());
+  public RpcProgramNfs3() throws IOException {
+    this(new Configuration());
   }
 
-  public RpcProgramNfs3(List<String> exports, Configuration config)
+  public RpcProgramNfs3(Configuration config)
       throws IOException {
     super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
         Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
    
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup();
+    
+    exports = NfsExports.getInstance(config);
     writeManager = new WriteManager(iug, config);
     clientCache = new DFSClientCache(config);
     superUserClient = new DFSClient(NameNode.getAddress(config), config);
@@ -185,7 +190,8 @@ public class RpcProgramNfs3 extends RpcP
   /******************************************************
    * RPC call handlers
    ******************************************************/
-  
+
+  @Override
   public NFS3Response nullProcedure() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS NULL");
@@ -193,8 +199,16 @@ public class RpcProgramNfs3 extends RpcP
     return new VoidResponse(Nfs3Status.NFS3_OK);
   }
 
-  public GETATTR3Response getattr(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public GETATTR3Response getattr(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -267,7 +281,9 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public SETATTR3Response setattr(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public SETATTR3Response setattr(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -298,34 +314,39 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     String fileIdPath = Nfs3Utils.getFileIdPath(handle);
-    WccAttr preOpAttr = null;
+    Nfs3FileAttributes preOpAttr = null;
     try {
-      preOpAttr = Nfs3Utils.getWccAttr(dfsClient, fileIdPath);
+      preOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       if (preOpAttr == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         response.setStatus(Nfs3Status.NFS3ERR_STALE);
         return response;
       }
+      WccAttr preOpWcc = Nfs3Utils.getWccAttr(preOpAttr);
       if (request.isCheck()) {
         if (!preOpAttr.getCtime().equals(request.getCtime())) {
-          WccData wccData = Nfs3Utils.createWccData(preOpAttr, dfsClient,
-              fileIdPath, iug);
+          WccData wccData = new WccData(preOpWcc, preOpAttr);
           return new SETATTR3Response(Nfs3Status.NFS3ERR_NOT_SYNC, wccData);
         }
       }
+      
+      // check the write access privilege
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
+            preOpWcc, preOpAttr));
+      }
 
       setattrInternal(dfsClient, fileIdPath, request.getAttr(), true);
       Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient,
           fileIdPath, iug);
-      WccData wccData = new WccData(preOpAttr, postOpAttr);
+      WccData wccData = new WccData(preOpWcc, postOpAttr);
       return new SETATTR3Response(Nfs3Status.NFS3_OK, wccData);
-
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       WccData wccData = null;
       try {
-        wccData = Nfs3Utils
-            .createWccData(preOpAttr, dfsClient, fileIdPath, iug);
+        wccData = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpAttr),
+            dfsClient, fileIdPath, iug);
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileIdPath: " + fileIdPath);
       }
@@ -337,8 +358,15 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public LOOKUP3Response lookup(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public LOOKUP3Response lookup(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -392,8 +420,15 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
   
-  public ACCESS3Response access(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public ACCESS3Response access(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -434,12 +469,20 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READLINK3Response readlink(XDR xdr, RpcAuthSys authSys) {
+  public READLINK3Response readlink(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
     return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
-  public READ3Response read(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public READ3Response read(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -528,8 +571,9 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
+  @Override
   public WRITE3Response write(XDR xdr, Channel channel, int xid,
-      RpcAuthSys authSys) {
+      RpcAuthSys authSys, InetAddress client) {
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -570,6 +614,13 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for fileId:" + handle.getFileId());
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
+      
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
+            Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
+            Nfs3Constant.WRITE_COMMIT_VERF);
+      }
+      
       if (LOG.isDebugEnabled()) {
         LOG.debug("requesed offset=" + offset + " and current filesize="
             + preOpAttr.getSize());
@@ -596,7 +647,8 @@ public class RpcProgramNfs3 extends RpcP
     return null;
   }
 
-  public CREATE3Response create(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public CREATE3Response create(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -631,16 +683,22 @@ public class RpcProgramNfs3 extends RpcP
 
     HdfsDataOutputStream fos = null;
     String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
-    WccAttr preOpDirAttr = null;
+    Nfs3FileAttributes preOpDirAttr = null;
     Nfs3FileAttributes postOpObjAttr = null;
     FileHandle fileHandle = null;
     WccData dirWcc = null;
     try {
-      preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
+      preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (preOpDirAttr == null) {
         LOG.error("Can't get path for dirHandle:" + dirHandle);
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
+      
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
+            preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+                preOpDirAttr));
+      }
 
       String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
       SetAttr3 setAttr3 = request.getObjAttr();
@@ -649,9 +707,9 @@ public class RpcProgramNfs3 extends RpcP
           SetAttrField.MODE) ? new FsPermission((short) setAttr3.getMode())
           : FsPermission.getDefault().applyUMask(umask);
           
-      EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ? EnumSet
-          .of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet
-          .of(CreateFlag.CREATE);
+      EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ? 
+          EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : 
+          EnumSet.of(CreateFlag.CREATE);
       
       fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
           flag, false, replication, blockSize, null, bufferSize, null),
@@ -668,8 +726,8 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
-      dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient, dirFileIdPath,
-          iug);
+      dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+          dfsClient, dirFileIdPath, iug);
     } catch (IOException e) {
       LOG.error("Exception", e);
       if (fos != null) {
@@ -682,8 +740,8 @@ public class RpcProgramNfs3 extends RpcP
       }
       if (dirWcc == null) {
         try {
-          dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-              dirFileIdPath, iug);
+          dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+              dfsClient, dirFileIdPath, iug);
         } catch (IOException e1) {
           LOG.error("Can't get postOpDirAttr for dirFileId:"
               + dirHandle.getFileId());
@@ -712,7 +770,8 @@ public class RpcProgramNfs3 extends RpcP
         dirWcc);
   }
 
-  public MKDIR3Response mkdir(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public MKDIR3Response mkdir(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -739,17 +798,22 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
-    WccAttr preOpDirAttr = null;
+    Nfs3FileAttributes preOpDirAttr = null;
     Nfs3FileAttributes postOpDirAttr = null;
     Nfs3FileAttributes postOpObjAttr = null;
     FileHandle objFileHandle = null;
     try {
-      preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
+      preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (preOpDirAttr == null) {
         LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
+            new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
+      }
+      
       final String fileIdPath = dirFileIdPath + "/" + fileName;
       SetAttr3 setAttr3 = request.getObjAttr();
       FsPermission permission = setAttr3.getUpdateFields().contains(
@@ -757,8 +821,8 @@ public class RpcProgramNfs3 extends RpcP
           : FsPermission.getDefault().applyUMask(umask);
 
       if (!dfsClient.mkdirs(fileIdPath, permission, false)) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
+        WccData dirWcc = Nfs3Utils.createWccData(
+            Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
         return new MKDIR3Response(Nfs3Status.NFS3ERR_IO, null, null, dirWcc);
       }
 
@@ -771,8 +835,8 @@ public class RpcProgramNfs3 extends RpcP
       
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       objFileHandle = new FileHandle(postOpObjAttr.getFileId());
-      WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-          dirFileIdPath, iug);
+      WccData dirWcc = Nfs3Utils.createWccData(
+          Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
       return new MKDIR3Response(Nfs3Status.NFS3_OK, new FileHandle(
           postOpObjAttr.getFileId()), postOpObjAttr, dirWcc);
     } catch (IOException e) {
@@ -785,7 +849,8 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath);
         }
       }
-      WccData dirWcc = new WccData(preOpDirAttr, postOpDirAttr);
+      WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+          postOpDirAttr);
       if (e instanceof AccessControlException) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_PERM, objFileHandle,
             postOpObjAttr, dirWcc);
@@ -796,12 +861,12 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-
-  public READDIR3Response mknod(XDR xdr, RpcAuthSys authSys) {
+  public READDIR3Response mknod(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
   
-  public REMOVE3Response remove(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public REMOVE3Response remove(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -825,10 +890,10 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
-    WccAttr preOpDirAttr = null;
+    Nfs3FileAttributes preOpDirAttr = null;
     Nfs3FileAttributes postOpDirAttr = null;
     try {
-      preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
+      preOpDirAttr =  Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (preOpDirAttr == null) {
         LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
         return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE);
@@ -838,24 +903,23 @@ public class RpcProgramNfs3 extends RpcP
       HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
           fileIdPath);
       if (fstat == null) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
+        WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+            preOpDirAttr);
         return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
       }
       if (fstat.isDir()) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
+        WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+            preOpDirAttr);
         return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc);
       }
 
-      if (dfsClient.delete(fileIdPath, false) == false) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
+      boolean result = dfsClient.delete(fileIdPath, false);
+      WccData dirWcc = Nfs3Utils.createWccData(
+          Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
+
+      if (!result) {
         return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, dirWcc);
       }
-
-      WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-          dirFileIdPath, iug);
       return new REMOVE3Response(Nfs3Status.NFS3_OK, dirWcc);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
@@ -867,7 +931,8 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath);
         }
       }
-      WccData dirWcc = new WccData(preOpDirAttr, postOpDirAttr);
+      WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+          postOpDirAttr);
       if (e instanceof AccessControlException) {
         return new REMOVE3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
       } else {
@@ -876,7 +941,8 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public RMDIR3Response rmdir(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public RMDIR3Response rmdir(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -901,45 +967,43 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
-    WccAttr preOpDirAttr = null;
+    Nfs3FileAttributes preOpDirAttr = null;
     Nfs3FileAttributes postOpDirAttr = null;
     try {
-      preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
+      preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (preOpDirAttr == null) {
         LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
         return new RMDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
+      
+      WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+          preOpDirAttr);
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
+      }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
       HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
           fileIdPath);
       if (fstat == null) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
+        return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
       }
       if (!fstat.isDir()) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, dirWcc);
+        return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, errWcc);
       }
       
       if (fstat.getChildrenNum() > 0) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, dirWcc);
+        return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, errWcc);
       }
 
-      if (dfsClient.delete(fileIdPath, false) == false) {
-        WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
-            dirFileIdPath, iug);
+      boolean result = dfsClient.delete(fileIdPath, false);
+      WccData dirWcc = Nfs3Utils.createWccData(
+          Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
+      if (!result) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, dirWcc);
       }
 
-      postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
-      WccData wccData = new WccData(preOpDirAttr, postOpDirAttr);
-      return new RMDIR3Response(Nfs3Status.NFS3_OK, wccData);
-
+      return new RMDIR3Response(Nfs3Status.NFS3_OK, dirWcc);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       // Try to return correct WccData
@@ -950,7 +1014,8 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath);
         }
       }
-      WccData dirWcc = new WccData(preOpDirAttr, postOpDirAttr);
+      WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
+          postOpDirAttr);
       if (e instanceof AccessControlException) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
       } else {
@@ -959,7 +1024,8 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public RENAME3Response rename(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -987,23 +1053,31 @@ public class RpcProgramNfs3 extends RpcP
 
     String fromDirFileIdPath = Nfs3Utils.getFileIdPath(fromHandle);
     String toDirFileIdPath = Nfs3Utils.getFileIdPath(toHandle);
-    WccAttr fromPreOpAttr = null;
-    WccAttr toPreOpAttr = null;
+    Nfs3FileAttributes fromPreOpAttr = null;
+    Nfs3FileAttributes toPreOpAttr = null;
     WccData fromDirWcc = null;
     WccData toDirWcc = null;
     try {
-      fromPreOpAttr = Nfs3Utils.getWccAttr(dfsClient, fromDirFileIdPath);
+      fromPreOpAttr = Nfs3Utils.getFileAttr(dfsClient, fromDirFileIdPath, iug);
       if (fromPreOpAttr == null) {
         LOG.info("Can't get path for fromHandle fileId:"
             + fromHandle.getFileId());
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
-      toPreOpAttr = Nfs3Utils.getWccAttr(dfsClient, toDirFileIdPath);
+      toPreOpAttr = Nfs3Utils.getFileAttr(dfsClient, toDirFileIdPath, iug);
       if (toPreOpAttr == null) {
         LOG.info("Can't get path for toHandle fileId:" + toHandle.getFileId());
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
+      
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
+            fromPreOpAttr);
+        WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
+            toPreOpAttr);
+        return new RENAME3Response(Nfs3Status.NFS3ERR_ACCES, fromWcc, toWcc);
+      }
 
       String src = fromDirFileIdPath + "/" + fromName;
       String dst = toDirFileIdPath + "/" + toName;
@@ -1011,20 +1085,20 @@ public class RpcProgramNfs3 extends RpcP
       dfsClient.rename(src, dst, Options.Rename.NONE);
 
       // Assemble the reply
-      fromDirWcc = Nfs3Utils.createWccData(fromPreOpAttr, dfsClient,
-          fromDirFileIdPath, iug);
-      toDirWcc = Nfs3Utils.createWccData(toPreOpAttr, dfsClient,
-          toDirFileIdPath, iug);
+      fromDirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
+          dfsClient, fromDirFileIdPath, iug);
+      toDirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(toPreOpAttr),
+          dfsClient, toDirFileIdPath, iug);
       return new RENAME3Response(Nfs3Status.NFS3_OK, fromDirWcc, toDirWcc);
-
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       // Try to return correct WccData      
       try {
-        fromDirWcc = Nfs3Utils.createWccData(fromPreOpAttr, dfsClient,
-            fromDirFileIdPath, iug);
-        toDirWcc = Nfs3Utils.createWccData(toPreOpAttr, dfsClient,
-            toDirFileIdPath, iug);
+        fromDirWcc = Nfs3Utils.createWccData(
+            Nfs3Utils.getWccAttr(fromPreOpAttr), dfsClient, fromDirFileIdPath,
+            iug);
+        toDirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(toPreOpAttr),
+            dfsClient, toDirFileIdPath, iug);
       } catch (IOException e1) {
         LOG.info("Can't get postOpDirAttr for " + fromDirFileIdPath + " or"
             + toDirFileIdPath);
@@ -1038,16 +1112,25 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys) {
+  public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
     return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
-  public READDIR3Response link(XDR xdr, RpcAuthSys authSys) {
+  public READDIR3Response link(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
-  public READDIR3Response readdir(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public READDIR3Response readdir(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -1180,7 +1263,12 @@ public class RpcProgramNfs3 extends RpcP
         dirStatus.getModificationTime(), dirList);
   }
 
-  public READDIRPLUS3Response readdirplus(XDR xdr, RpcAuthSys authSys) {   
+  public READDIRPLUS3Response readdirplus(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -1325,8 +1413,15 @@ public class RpcProgramNfs3 extends RpcP
         dirStatus.getModificationTime(), dirListPlus);
   }
   
-  public FSSTAT3Response fsstat(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public FSSTAT3Response fsstat(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -1376,8 +1471,15 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public FSINFO3Response fsinfo(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public FSINFO3Response fsinfo(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -1421,8 +1523,16 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public PATHCONF3Response pathconf(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public PATHCONF3Response pathconf(XDR xdr, RpcAuthSys authSys,
+      InetAddress client) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
+    
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+    
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
     if (dfsClient == null) {
@@ -1461,7 +1571,8 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public COMMIT3Response commit(XDR xdr, RpcAuthSys authSys) {
+  @Override
+  public COMMIT3Response commit(XDR xdr, RpcAuthSys authSys, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     String uname = authSysCheck(authSys);
     DFSClient dfsClient = clientCache.get(uname);
@@ -1486,13 +1597,20 @@ public class RpcProgramNfs3 extends RpcP
     }
 
     String fileIdPath = Nfs3Utils.getFileIdPath(handle);
-    WccAttr preOpAttr = null;
+    Nfs3FileAttributes preOpAttr = null;
     try {
-      preOpAttr = Nfs3Utils.getWccAttr(dfsClient, fileIdPath);
+      preOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       if (preOpAttr == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
+      
+      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+        return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
+            Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
+            Nfs3Constant.WRITE_COMMIT_VERF);
+      }
+      
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
       
@@ -1504,7 +1622,7 @@ public class RpcProgramNfs3 extends RpcP
       }
       Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient,
           handle, iug);
-      WccData fileWcc = new WccData(preOpAttr, postOpAttr);
+      WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
       return new COMMIT3Response(status, fileWcc,
           Nfs3Constant.WRITE_COMMIT_VERF);
 
@@ -1516,7 +1634,7 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileId: " + handle.getFileId());
       }
-      WccData fileWcc = new WccData(preOpAttr, postOpAttr);
+      WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
       return new COMMIT3Response(Nfs3Status.NFS3ERR_IO, fileWcc,
           Nfs3Constant.WRITE_COMMIT_VERF);
     }
@@ -1554,47 +1672,47 @@ public class RpcProgramNfs3 extends RpcP
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
-      response = getattr(xdr, authSys);
+      response = getattr(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
-      response = setattr(xdr, authSys);
+      response = setattr(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
-      response = lookup(xdr, authSys);
+      response = lookup(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
-      response = access(xdr, authSys);
+      response = access(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.READLINK) {
-      response = readlink(xdr, authSys);
+      response = readlink(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.READ) {
-      response = read(xdr, authSys);
+      response = read(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.WRITE) {
-      response = write(xdr, channel, xid, authSys);
+      response = write(xdr, channel, xid, authSys, client);
     } else if (nfsproc3 == NFSPROC3.CREATE) {
-      response = create(xdr, authSys);
+      response = create(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
-      response = mkdir(xdr, authSys);
+      response = mkdir(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
-      response = symlink(xdr, authSys);
+      response = symlink(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
-      response = mknod(xdr, authSys);
+      response = mknod(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
-      response = remove(xdr, authSys);
+      response = remove(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
-      response = rmdir(xdr, authSys);
+      response = rmdir(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.RENAME) {
-      response = rename(xdr, authSys);
+      response = rename(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.LINK) {
-      response = link(xdr, authSys);
+      response = link(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.READDIR) {
-      response = readdir(xdr, authSys);
+      response = readdir(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
-      response = readdirplus(xdr, authSys);
+      response = readdirplus(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
-      response = fsstat(xdr, authSys);
+      response = fsstat(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
-      response = fsinfo(xdr, authSys);
+      response = fsinfo(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
-      response = pathconf(xdr, authSys);
+      response = pathconf(xdr, authSys, client);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, authSys);
+      response = commit(xdr, authSys, client);
     } else {
       // Invalid procedure
       RpcAcceptedReply.voidReply(out, xid,
@@ -1611,4 +1729,17 @@ public class RpcProgramNfs3 extends RpcP
     final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure()); 
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
+  
+  private boolean checkAccessPrivilege(final InetAddress client,
+      final AccessPrivilege expected) {
+    AccessPrivilege access = exports.getAccessPrivilege(client);
+    if (access == AccessPrivilege.NONE) {
+      return false;
+    }
+    if (access == AccessPrivilege.READ_ONLY
+        && expected == AccessPrivilege.READ_WRITE) {
+      return false;
+    }
+    return true;
+  }
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/AccessPrivilege.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/AccessPrivilege.java?rev=1517040&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/AccessPrivilege.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/AccessPrivilege.java Fri Aug 23 21:14:43 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.security;
+
+public enum AccessPrivilege {
+  READ_ONLY,
+  READ_WRITE,
+  NONE;
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/NfsExports.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/NfsExports.java?rev=1517040&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/NfsExports.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/security/NfsExports.java Fri Aug 23 21:14:43 2013
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.security;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.SubnetUtils;
+import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.util.LightWeightCache;
+import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class provides functionality for loading and checking the mapping 
+ * between client hosts and their access privileges.
+ */
+public class NfsExports {
+  
+  private static NfsExports exports = null;
+  
+  public static synchronized NfsExports getInstance(Configuration conf) {
+    if (exports == null) {
+      String matchHosts = conf.get(Nfs3Constant.EXPORTS_ALLOWED_HOSTS_KEY,
+          Nfs3Constant.EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT);
+      int cacheSize = conf.getInt(Nfs3Constant.EXPORTS_CACHE_SIZE_KEY,
+          Nfs3Constant.EXPORTS_CACHE_SIZE_DEFAULT);
+      long expirationPeriodNano = conf.getLong(
+          Nfs3Constant.EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY,
+          Nfs3Constant.EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT) * 1000 * 1000;
+      exports = new NfsExports(cacheSize, expirationPeriodNano, matchHosts);
+    }
+    return exports;
+  }
+  
+  public static final Log LOG = LogFactory.getLog(NfsExports.class);
+  
+  // only support IPv4 now
+  private static final String IP_ADDRESS = 
+      "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})";
+  private static final String SLASH_FORMAT_SHORT = IP_ADDRESS + "/(\\d{1,3})";
+  private static final String SLASH_FORMAT_LONG = IP_ADDRESS + "/" + IP_ADDRESS;
+  
+  private static final Pattern CIDR_FORMAT_SHORT = 
+      Pattern.compile(SLASH_FORMAT_SHORT);
+  
+  private static final Pattern CIDR_FORMAT_LONG = 
+      Pattern.compile(SLASH_FORMAT_LONG);
+  
+  static class AccessCacheEntry implements LightWeightCache.Entry{
+    private final String hostAddr;
+    private AccessPrivilege access;
+    private final long expirationTime; 
+    
+    private LightWeightGSet.LinkedElement next;
+    
+    AccessCacheEntry(String hostAddr, AccessPrivilege access,
+        long expirationTime) {
+      Preconditions.checkArgument(hostAddr != null);
+      this.hostAddr = hostAddr;
+      this.access = access;
+      this.expirationTime = expirationTime;
+    }
+    
+    @Override
+    public int hashCode() {
+      return hostAddr.hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj instanceof AccessCacheEntry) {
+        AccessCacheEntry entry = (AccessCacheEntry) obj;
+        return this.hostAddr.equals(entry.hostAddr);
+      }
+      return false;
+    }
+    
+    @Override
+    public void setNext(LinkedElement next) {
+      this.next = next;
+    }
+
+    @Override
+    public LinkedElement getNext() {
+      return this.next;
+    }
+
+    @Override
+    public void setExpirationTime(long timeNano) {
+      // we set expiration time in the constructor, and the expiration time 
+      // does not change
+    }
+
+    @Override
+    public long getExpirationTime() {
+      return this.expirationTime;
+    }
+  }
+
+  private final List<Match> mMatches;
+  
+  private final LightWeightCache<AccessCacheEntry, AccessCacheEntry> accessCache;
+  private final long cacheExpirationPeriod;
+
+  /**
+   * Constructor.
+   * @param cacheSize The size of the access privilege cache.
+   * @param expirationPeriodNano The period 
+   * @param matchingHosts A string specifying one or multiple matchers. 
+   */
+  NfsExports(int cacheSize, long expirationPeriodNano, String matchHosts) {
+    this.cacheExpirationPeriod = expirationPeriodNano;
+    accessCache = new LightWeightCache<AccessCacheEntry, AccessCacheEntry>(
+        cacheSize, cacheSize, expirationPeriodNano, 0);        
+    String[] matchStrings = matchHosts.split(
+        Nfs3Constant.EXPORTS_ALLOWED_HOSTS_SEPARATOR);
+    mMatches = new ArrayList<Match>(matchStrings.length);
+    for(String mStr : matchStrings) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing match string '" + mStr + "'");
+      }
+      mStr = mStr.trim();
+      if(!mStr.isEmpty()) {
+        mMatches.add(getMatch(mStr));
+      }
+    }
+  }
+  
+  public AccessPrivilege getAccessPrivilege(InetAddress addr) {
+    return getAccessPrivilege(addr.getHostAddress(),
+        addr.getCanonicalHostName());
+  }
+  
+  AccessPrivilege getAccessPrivilege(String address, String hostname) {
+    long now = System.nanoTime();
+    AccessCacheEntry newEntry = new AccessCacheEntry(address,
+        AccessPrivilege.NONE, now + this.cacheExpirationPeriod);
+    // check if there is a cache entry for the given address
+    AccessCacheEntry cachedEntry = accessCache.get(newEntry);
+    if (cachedEntry != null && now < cachedEntry.expirationTime) {
+      // get a non-expired cache entry, use it
+      return cachedEntry.access;
+    } else {
+      for(Match match : mMatches) {
+        if(match.isIncluded(address, hostname)) {
+          if (match.accessPrivilege == AccessPrivilege.READ_ONLY) {
+            newEntry.access = AccessPrivilege.READ_ONLY;
+            break;
+          } else if (match.accessPrivilege == AccessPrivilege.READ_WRITE) {
+            newEntry.access = AccessPrivilege.READ_WRITE;
+          }
+        }
+      }
+      accessCache.put(newEntry);
+      return newEntry.access;
+    }
+  }
+
+  private static abstract class Match {
+    private final AccessPrivilege accessPrivilege;
+
+    private Match(AccessPrivilege accessPrivilege) {
+      this.accessPrivilege = accessPrivilege;
+    }
+
+    public abstract boolean isIncluded(String address, String hostname);
+  }
+  
+  /**
+   * Matcher covering all client hosts (specified by "*")
+   */
+  private static class AnonymousMatch extends Match {
+    private AnonymousMatch(AccessPrivilege accessPrivilege) {
+      super(accessPrivilege);
+    }
+  
+    @Override
+    public boolean isIncluded(String ip, String hostname) {
+      return true;
+    }
+  }
+  
+  /**
+   * Matcher using CIDR for client host matching
+   */
+  private static class CIDRMatch extends Match {
+    private final SubnetInfo subnetInfo;
+    
+    private CIDRMatch(AccessPrivilege accessPrivilege, SubnetInfo subnetInfo) {
+      super(accessPrivilege);
+      this.subnetInfo = subnetInfo;
+    }
+    
+    @Override
+    public boolean isIncluded(String address, String hostname) {
+      if(subnetInfo.isInRange(address)) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("CIDRNMatcher low = " + subnetInfo.getLowAddress() +
+              ", high = " + subnetInfo.getHighAddress() +
+              ", allowing client '" + address + "', '" + hostname + "'");
+        }
+        return true;
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("CIDRNMatcher low = " + subnetInfo.getLowAddress() +
+            ", high = " + subnetInfo.getHighAddress() +
+            ", denying client '" + address + "', '" + hostname + "'");
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Matcher requiring exact string match for client host
+   */
+  private static class ExactMatch extends Match {
+    private final String ipOrHost;
+    
+    private ExactMatch(AccessPrivilege accessPrivilege, String ipOrHost) {
+      super(accessPrivilege);
+      this.ipOrHost = ipOrHost;
+    }
+    
+    @Override
+    public boolean isIncluded(String address, String hostname) {
+      if(ipOrHost.equalsIgnoreCase(address) ||
+          ipOrHost.equalsIgnoreCase(hostname)) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("ExactMatcher '" + ipOrHost + "', allowing client " +
+              "'" + address + "', '" + hostname + "'");
+        }
+        return true;
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("ExactMatcher '" + ipOrHost + "', denying client " +
+            "'" + address + "', '" + hostname + "'");
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Matcher where client hosts are specified by regular expression
+   */
+  private static class RegexMatch extends Match {
+    private final Pattern pattern;
+
+    private RegexMatch(AccessPrivilege accessPrivilege, String wildcard) {
+      super(accessPrivilege);
+      this.pattern = Pattern.compile(wildcard, Pattern.CASE_INSENSITIVE);
+    }
+
+    @Override
+    public boolean isIncluded(String address, String hostname) {
+      if (pattern.matcher(address).matches()
+          || pattern.matcher(hostname).matches()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("RegexMatcher '" + pattern.pattern()
+              + "', allowing client '" + address + "', '" + hostname + "'");
+        }
+        return true;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RegexMatcher '" + pattern.pattern()
+            + "', denying client '" + address + "', '" + hostname + "'");
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Loading a matcher from a string. The default access privilege is read-only.
+   * The string contains 1 or 2 parts, separated by whitespace characters, where
+   * the first part specifies the client hosts, and the second part (if 
+   * existent) specifies the access privilege of the client hosts. I.e.,
+   * 
+   * "client-hosts [access-privilege]"
+   */
+  private static Match getMatch(String line) {
+    String[] parts = line.split("\\s+");
+    final String host;
+    AccessPrivilege privilege = AccessPrivilege.READ_ONLY;
+    switch (parts.length) {
+    case 1:
+      host = parts[0].toLowerCase().trim();
+      break;
+    case 2:
+      host = parts[0].toLowerCase().trim();
+      String option = parts[1].trim();
+      if ("rw".equalsIgnoreCase(option)) {
+        privilege = AccessPrivilege.READ_WRITE;
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Incorrectly formatted line '" + line
+          + "'");
+    }
+    if (host.equals("*")) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using match all for '" + host + "' and " + privilege);
+      }
+      return new AnonymousMatch(privilege);
+    } else if (CIDR_FORMAT_SHORT.matcher(host).matches()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using CIDR match for '" + host + "' and " + privilege);
+      }
+      return new CIDRMatch(privilege, new SubnetUtils(host).getInfo());
+    } else if (CIDR_FORMAT_LONG.matcher(host).matches()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using CIDR match for '" + host + "' and " + privilege);
+      }
+      String[] pair = host.split("/");
+      return new CIDRMatch(privilege,
+          new SubnetUtils(pair[0], pair[1]).getInfo());
+    } else if (host.contains("*") || host.contains("?") || host.contains("[")
+        || host.contains("]")) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using Regex match for '" + host + "' and " + privilege);
+      }
+      return new RegexMatch(privilege, host);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using exact match for '" + host + "' and " + privilege);
+    }
+    return new ExactMatch(privilege, host);
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/security/TestNfsExports.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/security/TestNfsExports.java?rev=1517040&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/security/TestNfsExports.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/security/TestNfsExports.java Fri Aug 23 21:14:43 2013
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.security;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
+import org.apache.hadoop.hdfs.nfs.security.NfsExports;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.junit.Test;
+
+public class TestNfsExports {
+
+  private final String address1 = "192.168.0.1";
+  private final String address2 = "10.0.0.1";
+  private final String hostname1 = "a.b.com";
+  private final String hostname2 = "a.b.org";
+  
+  private static final long ExpirationPeriod = 
+      Nfs3Constant.EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT * 1000 * 1000;
+  
+  private static final int CacheSize = Nfs3Constant.EXPORTS_CACHE_SIZE_DEFAULT;
+
+  @Test
+  public void testWildcardRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, "* rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+  }
+
+  @Test
+  public void testWildcardRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, "* ro");
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+  }
+
+  @Test
+  public void testExactAddressRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, address1
+        + " rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertFalse(AccessPrivilege.READ_WRITE == matcher
+        .getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testExactAddressRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, address1);
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testExactHostRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, hostname1
+        + " rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+  }
+
+  @Test
+  public void testExactHostRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, hostname1);
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+  }
+
+  @Test
+  public void testCidrShortRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
+        "192.168.0.0/22 rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testCidrShortRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
+        "192.168.0.0/22");
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testCidrLongRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, 
+        "192.168.0.0/255.255.252.0 rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testCidrLongRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, 
+        "192.168.0.0/255.255.252.0");
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testRegexIPRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
+        "192.168.0.[0-9]+ rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testRegexIPRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
+        "192.168.0.[0-9]+");
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, hostname1));
+  }
+
+  @Test
+  public void testRegexHostRW() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
+        "[a-z]+.b.com rw");
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname1));
+    // address1 will hit the cache
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address1, hostname2));
+  }
+
+  @Test
+  public void testRegexHostRO() {
+    NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
+        "[a-z]+.b.com");
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+    // address1 will hit the cache
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname2));
+  }
+  
+  @Test
+  public void testMultiMatchers() throws Exception {
+    long shortExpirationPeriod = 1 * 1000 * 1000 * 1000; // 1s
+    NfsExports matcher = new NfsExports(CacheSize, shortExpirationPeriod, 
+        "192.168.0.[0-9]+;[a-z]+.b.com rw");
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname2));
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, address1));
+    Assert.assertEquals(AccessPrivilege.READ_ONLY,
+        matcher.getAccessPrivilege(address1, hostname1));
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address2, hostname1));
+    // address2 will hit the cache
+    Assert.assertEquals(AccessPrivilege.READ_WRITE,
+        matcher.getAccessPrivilege(address2, hostname2));
+    
+    Thread.sleep(1000);
+    // no cache for address2 now
+    Assert.assertEquals(AccessPrivilege.NONE,
+        matcher.getAccessPrivilege(address2, address2));
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1517040&r1=1517039&r2=1517040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 23 21:14:43 2013
@@ -304,6 +304,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5069 Include hadoop-nfs and hadoop-hdfs-nfs into hadoop dist for
     NFS deployment (brandonli)
 
+    HDFS-4947 Add NFS server export table to control export by hostname or
+    IP range (Jing Zhao via brandonli)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may