You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/09/13 23:28:01 UTC

svn commit: r1523110 - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/had...

Author: arp
Date: Fri Sep 13 21:27:58 2013
New Revision: 1523110

URL: http://svn.apache.org/r1523110
Log:
Merging r1521566 through r1523108 from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HdfsAuditLogger.java
      - copied unchanged from r1523108, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HdfsAuditLogger.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
      - copied unchanged from r1523108, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestLossyRetryInvocationHandler.java
      - copied unchanged from r1523108, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestLossyRetryInvocationHandler.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1521566-1523108

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Fri Sep 13 21:27:58 2013
@@ -32,10 +32,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.mount.MountEntry;
 import org.apache.hadoop.mount.MountInterface;
 import org.apache.hadoop.mount.MountResponse;
+import org.apache.hadoop.nfs.AccessPrivilege;
+import org.apache.hadoop.nfs.NfsExports;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
-import org.apache.hadoop.nfs.security.AccessPrivilege;
-import org.apache.hadoop.nfs.security.NfsExports;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.RpcProgram;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Fri Sep 13 21:27:58 2013
@@ -49,7 +49,7 @@ public class Nfs3Utils {
 
   public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
       throws IOException {
-    return client.getFileInfo(fileIdPath);
+    return client.getFileLinkInfo(fileIdPath);
   }
 
   public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
@@ -59,7 +59,10 @@ public class Nfs3Utils {
      * client takes only the lower 32bit of the fileId and treats it as signed
      * int. When the 32th bit is 1, the client considers it invalid.
      */
-    return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
+    NfsFileType fileType = fs.isDir() ? NfsFileType.NFSDIR : NfsFileType.NFSREG;
+    fileType = fs.isSymlink() ? NfsFileType.NFSLNK : fileType;
+    
+    return new Nfs3FileAttributes(fileType, fs.getChildrenNum(), fs
         .getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
         iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
         fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Fri Sep 13 21:27:58 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -42,6 +43,9 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.nfs.AccessPrivilege;
+import org.apache.hadoop.nfs.NfsExports;
+import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.NfsTime;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -63,10 +67,12 @@ import org.apache.hadoop.nfs.nfs3.reques
 import org.apache.hadoop.nfs.nfs3.request.READ3Request;
 import org.apache.hadoop.nfs.nfs3.request.READDIR3Request;
 import org.apache.hadoop.nfs.nfs3.request.READDIRPLUS3Request;
+import org.apache.hadoop.nfs.nfs3.request.READLINK3Request;
 import org.apache.hadoop.nfs.nfs3.request.REMOVE3Request;
 import org.apache.hadoop.nfs.nfs3.request.RENAME3Request;
 import org.apache.hadoop.nfs.nfs3.request.RMDIR3Request;
 import org.apache.hadoop.nfs.nfs3.request.SETATTR3Request;
+import org.apache.hadoop.nfs.nfs3.request.SYMLINK3Request;
 import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
 import org.apache.hadoop.nfs.nfs3.request.SetAttr3.SetAttrField;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@@ -96,16 +102,18 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
-import org.apache.hadoop.nfs.security.AccessPrivilege;
-import org.apache.hadoop.nfs.security.NfsExports;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
-import org.apache.hadoop.oncrpc.RpcAuthSys;
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.RpcDeniedReply;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.RpcReply;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsSys;
+import org.apache.hadoop.oncrpc.security.Credentials;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
+import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
+import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
 import org.apache.hadoop.security.AccessControlException;
 import org.jboss.netty.channel.Channel;
 
@@ -205,8 +213,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr, RpcAuthSys authSys,
-      InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -214,8 +222,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -268,9 +275,9 @@ public class RpcProgramNfs3 extends RpcP
     if (updateFields.contains(SetAttrField.UID)
         || updateFields.contains(SetAttrField.GID)) {
       String uname = updateFields.contains(SetAttrField.UID) ? iug.getUserName(
-          newAttr.getUid(), UNKNOWN_USER) : null;
+          newAttr.getUid(), Nfs3Constant.UNKNOWN_USER) : null;
       String gname = updateFields.contains(SetAttrField.GID) ? iug
-          .getGroupName(newAttr.getGid(), UNKNOWN_GROUP) : null;
+          .getGroupName(newAttr.getGid(), Nfs3Constant.UNKNOWN_GROUP) : null;
       dfsClient.setOwner(fileIdPath, uname, gname);
     }
 
@@ -287,11 +294,10 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr, RpcAuthSys authSys,
-      InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -364,7 +370,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -372,8 +379,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -426,7 +432,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public ACCESS3Response access(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -434,8 +441,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -464,8 +470,8 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for fileId:" + handle.getFileId());
         return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      int access = Nfs3Utils.getAccessRightsForUserGroup(authSys.getUid(),
-          authSys.getGid(), attrs);
+      int access = Nfs3Utils.getAccessRightsForUserGroup(
+          securityHandler.getUid(), securityHandler.getGid(), attrs);
       
       return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
     } catch (IOException e) {
@@ -474,13 +480,75 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READLINK3Response readlink(XDR xdr, RpcAuthSys authSys,
+  public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
       InetAddress client) {
-    return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
+    READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
+
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
+    if (dfsClient == null) {
+      response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
+      return response;
+    }
+
+    READLINK3Request request = null;
+
+    try {
+      request = new READLINK3Request(xdr);
+    } catch (IOException e) {
+      LOG.error("Invalid READLINK request");
+      return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
+    }
+
+    FileHandle handle = request.getHandle();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NFS READLINK fileId: " + handle.getFileId());
+    }
+
+    String fileIdPath = Nfs3Utils.getFileIdPath(handle);
+    try {
+      String target = dfsClient.getLinkTarget(fileIdPath);
+
+      Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient,
+          fileIdPath, iug);
+      if (postOpAttr == null) {
+        LOG.info("Can't get path for fileId:" + handle.getFileId());
+        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
+      }
+      if (postOpAttr.getType() != NfsFileType.NFSLNK.toValue()) {
+        LOG.error("Not a symlink, fileId:" + handle.getFileId());
+        return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
+      }
+      if (target == null) {
+        LOG.error("Symlink target should not be null, fileId:"
+            + handle.getFileId());
+        return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
+      }
+      if (MAX_READ_TRANSFER_SIZE < target.getBytes().length) {
+        return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr, null);
+      }
+
+      return new READLINK3Response(Nfs3Status.NFS3_OK, postOpAttr,
+          target.getBytes());
+
+    } catch (IOException e) {
+      LOG.warn("Readlink error: " + e.getClass(), e);
+      if (e instanceof FileNotFoundException) {
+        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
+      } else if (e instanceof AccessControlException) {
+        return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
+      }
+      return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+    }
   }
 
   @Override
-  public READ3Response read(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public READ3Response read(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -488,8 +556,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -534,8 +601,8 @@ public class RpcProgramNfs3 extends RpcP
         }
         return new READ3Response(Nfs3Status.NFS3ERR_NOENT);
       }
-      int access = Nfs3Utils.getAccessRightsForUserGroup(authSys.getUid(),
-          authSys.getGid(), attrs);
+      int access = Nfs3Utils.getAccessRightsForUserGroup(
+          securityHandler.getUid(), securityHandler.getGid(), attrs);
       if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
         eof = offset < attrs.getSize() ? false : true;
         return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
@@ -578,10 +645,10 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public WRITE3Response write(XDR xdr, Channel channel, int xid,
-      RpcAuthSys authSys, InetAddress client) {
+      SecurityHandler securityHandler, InetAddress client) {
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -653,10 +720,10 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public CREATE3Response create(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public CREATE3Response create(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -725,7 +792,7 @@ public class RpcProgramNfs3 extends RpcP
         // Set group if it's not specified in the request.
         if (!setAttr3.getUpdateFields().contains(SetAttrField.GID)) {
           setAttr3.getUpdateFields().add(SetAttrField.GID);
-          setAttr3.setGid(authSys.getGid());
+          setAttr3.setGid(securityHandler.getGid());
         }
         setattrInternal(dfsClient, fileIdPath, setAttr3, false);
       }
@@ -776,10 +843,10 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public MKDIR3Response mkdir(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -834,7 +901,7 @@ public class RpcProgramNfs3 extends RpcP
       // Set group if it's not specified in the request.
       if (!setAttr3.getUpdateFields().contains(SetAttrField.GID)) {
         setAttr3.getUpdateFields().add(SetAttrField.GID);
-        setAttr3.setGid(authSys.getGid());
+        setAttr3.setGid(securityHandler.getGid());
       }
       setattrInternal(dfsClient, fileIdPath, setAttr3, false);
       
@@ -866,15 +933,16 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response mknod(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public READDIR3Response mknod(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
   
   @Override
-  public REMOVE3Response remove(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public REMOVE3Response remove(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -947,10 +1015,10 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RMDIR3Response rmdir(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1030,10 +1098,10 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public RENAME3Response rename(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1118,18 +1186,72 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
+  public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
       InetAddress client) {
-    return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
+    SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
+
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
+    if (dfsClient == null) {
+      response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
+      return response;
+    }
+
+    SYMLINK3Request request = null;
+    try {
+      request = new SYMLINK3Request(xdr);
+    } catch (IOException e) {
+      LOG.error("Invalid SYMLINK request");
+      response.setStatus(Nfs3Status.NFS3ERR_INVAL);
+      return response;
+    }
+
+    FileHandle dirHandle = request.getHandle();
+    String name = request.getName();
+    String symData = request.getSymData();
+    String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle);
+    // Don't do any name check to source path, just leave it to HDFS
+    String linkIdPath = linkDirIdPath + "/" + name;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath);
+    }
+
+    try {
+      WccData dirWcc = response.getDirWcc();
+      WccAttr preOpAttr = Nfs3Utils.getWccAttr(dfsClient, linkDirIdPath);
+      dirWcc.setPreOpAttr(preOpAttr);
+
+      dfsClient.createSymlink(symData, linkIdPath, false);
+      // Set symlink attr is considered as to change the attr of the target
+      // file. So no need to set symlink attr here after it's created.
+
+      HdfsFileStatus linkstat = dfsClient.getFileLinkInfo(linkIdPath);
+      Nfs3FileAttributes objAttr = Nfs3Utils.getNfs3FileAttrFromFileStatus(
+          linkstat, iug);
+      dirWcc
+          .setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug));
+
+      return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle(
+          objAttr.getFileid()), objAttr, dirWcc);
+
+    } catch (IOException e) {
+      LOG.warn("Exception:" + e);
+      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      return response;
+    }
   }
 
-  public READDIR3Response link(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
   @Override
-  public READDIR3Response readdir(XDR xdr, RpcAuthSys authSys,
-      InetAddress client) {
+  public READDIR3Response readdir(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1137,8 +1259,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1269,14 +1390,13 @@ public class RpcProgramNfs3 extends RpcP
         dirStatus.getModificationTime(), dirList);
   }
 
-  public READDIRPLUS3Response readdirplus(XDR xdr, RpcAuthSys authSys,
-      InetAddress client) {
+  public READDIRPLUS3Response readdirplus(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
     }
@@ -1420,7 +1540,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1428,8 +1549,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1478,7 +1598,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1486,8 +1607,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1530,8 +1650,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr, RpcAuthSys authSys,
-      InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1539,8 +1659,7 @@ public class RpcProgramNfs3 extends RpcP
       return response;
     }
     
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1578,10 +1697,10 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr, RpcAuthSys authSys, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr,
+      SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
-    String uname = authSysCheck(authSys);
-    DFSClient dfsClient = clientCache.get(uname);
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
@@ -1645,12 +1764,15 @@ public class RpcProgramNfs3 extends RpcP
           Nfs3Constant.WRITE_COMMIT_VERF);
     }
   }
-  
-  private final static String UNKNOWN_USER = "nobody";
-  private final static String UNKNOWN_GROUP = "nobody";
 
-  private String authSysCheck(RpcAuthSys authSys) {
-    return iug.getUserName(authSys.getUid(), UNKNOWN_USER);
+  private SecurityHandler getSecurityHandler(Credentials credentials,
+      Verifier verifier) {
+    if (credentials instanceof CredentialsSys) {
+      return new SysSecurityHandler((CredentialsSys) credentials, iug);
+    } else {
+      // TODO: support GSS and handle other cases
+      return null;
+    }
   }
   
   @Override
@@ -1658,67 +1780,71 @@ public class RpcProgramNfs3 extends RpcP
       InetAddress client, Channel channel) {
     final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
-    RpcAuthSys authSys = null;
-    
+
+    Credentials credentials = rpcCall.getCredential();
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
-      if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS) {
+      if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
+          && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
         LOG.info("Wrong RPC AUTH flavor, "
-            + rpcCall.getCredential().getFlavor() + " is not AUTH_SYS.");
+            + rpcCall.getCredential().getFlavor()
+            + " is not AUTH_SYS or RPCSEC_GSS.");
         XDR reply = new XDR();
         reply = RpcDeniedReply.voidReply(reply, xid,
             RpcReply.ReplyState.MSG_ACCEPTED,
             RpcDeniedReply.RejectState.AUTH_ERROR);
         return reply;
       }
-      authSys = RpcAuthSys.from(rpcCall.getCredential().getBody());
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(credentials,
+        rpcCall.getVerifier());
+    
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
-      response = getattr(xdr, authSys, client);
+      response = getattr(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
-      response = setattr(xdr, authSys, client);
+      response = setattr(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
-      response = lookup(xdr, authSys, client);
+      response = lookup(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
-      response = access(xdr, authSys, client);
+      response = access(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.READLINK) {
-      response = readlink(xdr, authSys, client);
+      response = readlink(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.READ) {
-      response = read(xdr, authSys, client);
+      response = read(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.WRITE) {
-      response = write(xdr, channel, xid, authSys, client);
+      response = write(xdr, channel, xid, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.CREATE) {
-      response = create(xdr, authSys, client);
+      response = create(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
-      response = mkdir(xdr, authSys, client);
+      response = mkdir(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
-      response = symlink(xdr, authSys, client);
+      response = symlink(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
-      response = mknod(xdr, authSys, client);
+      response = mknod(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
-      response = remove(xdr, authSys, client);
+      response = remove(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
-      response = rmdir(xdr, authSys, client);
+      response = rmdir(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.RENAME) {
-      response = rename(xdr, authSys, client);
+      response = rename(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.LINK) {
-      response = link(xdr, authSys, client);
+      response = link(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.READDIR) {
-      response = readdir(xdr, authSys, client);
+      response = readdir(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
-      response = readdirplus(xdr, authSys, client);
+      response = readdirplus(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
-      response = fsstat(xdr, authSys, client);
+      response = fsstat(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
-      response = fsinfo(xdr, authSys, client);
+      response = fsinfo(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
-      response = pathconf(xdr, authSys, client);
+      response = pathconf(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, authSys, client);
+      response = commit(xdr, securityHandler, client);
     } else {
       // Invalid procedure
       RpcAcceptedReply.voidReply(out, xid,

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java Fri Sep 13 21:27:58 2013
@@ -42,7 +42,7 @@ public class TestMountd {
     // Start minicluster
     Configuration config = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
-        .manageNameDfsDirs(false).build();
+        .build();
     cluster.waitActive();
     
     // Start nfs

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Sep 13 21:27:58 2013
@@ -272,6 +272,9 @@ Release 2.3.0 - UNRELEASED
     HDFS-4879. Add "blocked ArrayList" collection to avoid CMS full GCs
     (Todd Lipcon via Colin Patrick McCabe)
 
+    HDFS-4096. Add snapshot information to namenode WebUI. (Haohui Mai via 
+    jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -330,6 +333,11 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5118. Provide testing support for DFSClient to drop RPC responses.
     (jing9)
 
+    HDFS-5085. Refactor o.a.h.nfs to support different types of 
+    authentications. (jing9)
+
+    HDFS-5067 Support symlink operations in NFS gateway. (brandonli)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
@@ -364,6 +372,8 @@ Release 2.1.1-beta - UNRELEASED
 
     HDFS-5150. Allow per NN SPN for internal SPNEGO. (kihwal)
 
+    HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -432,6 +442,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5159. Secondary NameNode fails to checkpoint if error occurs
     downloading edits on first checkpoint. (atm)
 
+    HDFS-5192. NameNode may fail to start when 
+    dfs.client.test.drop.namenode.response.number is set. (jing9)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1521566-1523108

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 13 21:27:58 2013
@@ -484,14 +484,17 @@ public class DFSClient implements java.i
     int numResponseToDrop = conf.getInt(
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
+    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
     if (numResponseToDrop > 0) {
       // This case is used for testing.
       LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
           + " is set to " + numResponseToDrop
           + ", this hacked client will proactively drop responses");
-      NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies
-          .createProxyWithLossyRetryHandler(conf, nameNodeUri,
-              ClientProtocol.class, numResponseToDrop);
+      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
+          nameNodeUri, ClientProtocol.class, numResponseToDrop);
+    }
+    
+    if (proxyInfo != null) {
       this.dtService = proxyInfo.getDelegationTokenService();
       this.namenode = proxyInfo.getProxy();
     } else if (rpcNamenode != null) {
@@ -502,9 +505,8 @@ public class DFSClient implements java.i
     } else {
       Preconditions.checkArgument(nameNodeUri != null,
           "null URI");
-      NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo =
-        NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
-      
+      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
+          ClientProtocol.class);
       this.dtService = proxyInfo.getDelegationTokenService();
       this.namenode = proxyInfo.getProxy();
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 13 21:27:58 2013
@@ -267,6 +267,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
   public static final String  DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
   public static final String  DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
+  public static final String  DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
+  public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
 
   // Much code in hdfs is not yet updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Sep 13 21:27:58 2013
@@ -158,8 +158,8 @@ public class NameNodeProxies {
    * Generate a dummy namenode proxy instance that utilizes our hacked
    * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
    * method will proactively drop RPC responses. Currently this method only
-   * support HA setup. IllegalStateException will be thrown if the given
-   * configuration is not for HA.
+   * support HA setup. null will be returned if the given configuration is not 
+   * for HA.
    * 
    * @param config the configuration containing the required IPC
    *        properties, client failover configurations, etc.
@@ -168,7 +168,8 @@ public class NameNodeProxies {
    * @param xface the IPC interface which should be created
    * @param numResponseToDrop The number of responses to drop for each RPC call
    * @return an object containing both the proxy and the associated
-   *         delegation token service it corresponds to
+   *         delegation token service it corresponds to. Will return null of the
+   *         given configuration does not support HA.
    * @throws IOException if there is an error creating the proxy
    */
   @SuppressWarnings("unchecked")
@@ -204,8 +205,9 @@ public class NameNodeProxies {
       Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
       return new ProxyAndInfo<T>(proxy, dtService);
     } else {
-      throw new IllegalStateException("Currently creating proxy using " +
+      LOG.warn("Currently creating proxy using " +
       		"LossyRetryInvocationHandler requires NN HA setup");
+      return null;
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java Fri Sep 13 21:27:58 2013
@@ -58,6 +58,15 @@ public class DelegationTokenSecretManage
       .getLog(DelegationTokenSecretManager.class);
   
   private final FSNamesystem namesystem;
+
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+    this(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+        delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false,
+        namesystem);
+  }
+
   /**
    * Create a secret manager
    * @param delegationKeyUpdateInterval the number of seconds for rolling new
@@ -67,13 +76,16 @@ public class DelegationTokenSecretManage
    * @param delegationTokenRenewInterval how often the tokens must be renewed
    * @param delegationTokenRemoverScanInterval how often the tokens are scanned
    *        for expired tokens
+   * @param storeTokenTrackingId whether to store the token's tracking id
    */
   public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
-      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+      long delegationTokenRemoverScanInterval, boolean storeTokenTrackingId,
+      FSNamesystem namesystem) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
         delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
     this.namesystem = namesystem;
+    this.storeTokenTrackingId = storeTokenTrackingId;
   }
 
   @Override //SecretManager
@@ -184,7 +196,7 @@ public class DelegationTokenSecretManage
     }
     if (currentTokens.get(identifier) == null) {
       currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     } else {
       throw new IOException(
           "Same delegation token being added twice; invalid entry in fsimage or editlogs");
@@ -223,7 +235,7 @@ public class DelegationTokenSecretManage
       byte[] password = createPassword(identifier.getBytes(), allKeys
           .get(keyId).getKey());
       currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 13 21:27:58 2013
@@ -36,6 +36,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
@@ -227,6 +229,8 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -318,8 +322,14 @@ public class FSNamesystem implements Nam
           stat.getGroup(), symlink, path);
     }
     for (AuditLogger logger : auditLoggers) {
-      logger.logAuditEvent(succeeded, ugi.toString(), addr,
-          cmd, src, dst, status);
+      if (logger instanceof HdfsAuditLogger) {
+        HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
+        hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
+            status, ugi, dtSecretManager);
+      } else {
+        logger.logAuditEvent(succeeded, ugi.toString(), addr,
+            cmd, src, dst, status);
+      }
     }
   }
 
@@ -4209,6 +4219,15 @@ public class FSNamesystem implements Nam
     return this.snapshotManager.getNumSnapshots();
   }
 
+  @Override
+  public String getSnapshotStats() {
+    Map<String, Object> info = new HashMap<String, Object>();
+    info.put("SnapshottableDirectories", this.getNumSnapshottableDirs());
+    info.put("Snapshots", this.getNumSnapshots());
+    return JSON.toString(info);
+  }
+
+
   int getNumberOfDatanodes(DatanodeReportType type) {
     readLock();
     try {
@@ -5921,7 +5940,10 @@ public class FSNamesystem implements Nam
             DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
         conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
             DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
-        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this);
+        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL,
+        conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
+            DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT),
+        this);
   }
 
   /**
@@ -6832,17 +6854,22 @@ public class FSNamesystem implements Nam
    * defined in the config file. It can also be explicitly listed in the
    * config file.
    */
-  private static class DefaultAuditLogger implements AuditLogger {
+  private static class DefaultAuditLogger extends HdfsAuditLogger {
+
+    private boolean logTokenTrackingId;
 
     @Override
     public void initialize(Configuration conf) {
-      // Nothing to do.
+      logTokenTrackingId = conf.getBoolean(
+          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
+          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
     }
 
     @Override
     public void logAuditEvent(boolean succeeded, String userName,
         InetAddress addr, String cmd, String src, String dst,
-        FileStatus status) {
+        FileStatus status, UserGroupInformation ugi,
+        DelegationTokenSecretManager dtSecretManager) {
       if (auditLog.isInfoEnabled()) {
         final StringBuilder sb = auditBuffer.get();
         sb.setLength(0);
@@ -6860,6 +6887,22 @@ public class FSNamesystem implements Nam
           sb.append(status.getGroup()).append(":");
           sb.append(status.getPermission());
         }
+        if (logTokenTrackingId) {
+          sb.append("\t").append("trackingId=");
+          String trackingId = null;
+          if (ugi != null && dtSecretManager != null
+              && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
+            for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
+              if (tid instanceof DelegationTokenIdentifier) {
+                DelegationTokenIdentifier dtid =
+                    (DelegationTokenIdentifier)tid;
+                trackingId = dtSecretManager.getTokenTrackingId(dtid);
+                break;
+              }
+            }
+          }
+          sb.append(trackingId);
+        }
         auditLog.info(sb);
       }
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Fri Sep 13 21:27:58 2013
@@ -204,6 +204,17 @@ class NamenodeJspHelper {
     return "";
   }
 
+  static void generateSnapshotReport(JspWriter out, FSNamesystem fsn)
+      throws IOException {
+    out.println("<div id=\"snapshotstats\"><div class=\"dfstable\">"
+        + "<table class=\"storage\" title=\"Snapshot Summary\">\n"
+        + "<thead><tr><td><b>Snapshottable directories</b></td>"
+        + "<td><b>Snapshotted directories</b></td></tr></thead>");
+
+    out.println(String.format("<td>%d</td><td>%d</td>", fsn.getNumSnapshottableDirs(), fsn.getNumSnapshots()));
+    out.println("</table></div></div>");
+  }
+
   static class HealthJsp {
     private int rowNum = 0;
     private int colNum = 0;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Fri Sep 13 21:27:58 2013
@@ -130,4 +130,9 @@ public interface FSNamesystemMBean {
    * @return number of decommissioned dead data nodes
    */
   public int getNumDecomDeadDataNodes();
+
+  /**
+   * The statistics of snapshots
+   */
+  public String getSnapshotStats();
 }

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1521566-1523108

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1521566-1523108

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1521566-1523108

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1523110&r1=1523109&r2=1523110&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Fri Sep 13 21:27:58 2013
@@ -73,7 +73,10 @@
 <% healthjsp.generateJournalReport(out, nn, request); %>
 <hr/>
 <% healthjsp.generateConfReport(out, nn, request); %>
-<hr>
+<hr/>
+<h3>Snapshot Summary</h3>
+<% NamenodeJspHelper.generateSnapshotReport(out, fsn); %>
+<hr/>
 <h3>Startup Progress</h3>
 <% healthjsp.generateStartupProgress(out, nn.getStartupProgress()); %>
 <%

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1521566-1523108

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1521566-1523108