You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by um...@apache.org on 2014/05/02 12:57:28 UTC

svn commit: r1591866 - in /hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/namenode/

Author: umamahesh
Date: Fri May  2 10:57:27 2014
New Revision: 1591866

URL: http://svn.apache.org/r1591866
Log:
HDFS-6258. Namenode server-side storage for XAttrs. Contributed by Yi Liu.

Added:
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNConf.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeXAttr.java
Removed:
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclConfigFlag.java
Modified:
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-2006.txt
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

Modified: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-2006.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-2006.txt?rev=1591866&r1=1591865&r2=1591866&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-2006.txt (original)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-2006.txt Fri May  2 10:57:27 2014
@@ -14,6 +14,8 @@ HDFS-2006 (Unreleased)
 
    HDFS-6309. Javadocs for Xattrs apis in DFSClient and other minor fixups. (Charles Lamb via umamahesh)
 
+   HDFS-6258. Namenode server-side storage for XAttrs. (Yi Liu via umamahesh)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1591866&r1=1591865&r2=1591866&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri May  2 10:57:27 2014
@@ -192,6 +192,10 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup";
   public static final String  DFS_NAMENODE_ACLS_ENABLED_KEY = "dfs.namenode.acls.enabled";
   public static final boolean DFS_NAMENODE_ACLS_ENABLED_DEFAULT = false;
+  public static final String  DFS_NAMENODE_XATTRS_ENABLED_KEY = "dfs.namenode.xattrs.enabled";
+  public static final boolean DFS_NAMENODE_XATTRS_ENABLED_DEFAULT = true;
+  public static final String  DFS_NAMENODE_INODE_XATTRS_MAX_LIMIT_KEY = "dfs.namenode.inode.xattrs.max-limit";
+  public static final int     DFS_NAMENODE_INODE_XATTRS_MAX_LIMIT_DEFAULT = 32;
   public static final String  DFS_ADMIN = "dfs.cluster.administrators";
   public static final String  DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.https.server.keystore.resource";
   public static final String  DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml";

Modified: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1591866&r1=1591865&r2=1591866&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri May  2 10:57:27 2014
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -39,6 +40,8 @@ import org.apache.hadoop.fs.ParentNotDir
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -78,6 +81,7 @@ import org.apache.hadoop.hdfs.util.ReadO
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -124,6 +128,7 @@ public class FSDirectory implements Clos
   private final int contentCountLimit; // max content summary counts per run
   private final INodeMap inodeMap; // Synchronized by dirLock
   private long yieldCount = 0; // keep track of lock yield count.
+  private final int inodeXAttrsLimit; //inode xattrs max limit
 
   // lock to protect the directory and BlockMap
   private final ReentrantReadWriteLock dirLock;
@@ -189,6 +194,9 @@ public class FSDirectory implements Clos
     this.maxDirItems = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+    this.inodeXAttrsLimit = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_INODE_XATTRS_MAX_LIMIT_KEY,
+        DFSConfigKeys.DFS_NAMENODE_INODE_XATTRS_MAX_LIMIT_DEFAULT);
     // We need a maximum maximum because by default, PB limits message sizes
     // to 64MB. This means we can only store approximately 6.7 million entries
     // per directory, but let's use 6.4 million for some safety.
@@ -348,6 +356,7 @@ public class FSDirectory implements Clos
                             String path, 
                             PermissionStatus permissions,
                             List<AclEntry> aclEntries,
+                            List<XAttr> xAttrs,
                             short replication,
                             long modificationTime,
                             long atime,
@@ -374,6 +383,10 @@ public class FSDirectory implements Clos
           AclStorage.updateINodeAcl(newNode, aclEntries,
             Snapshot.CURRENT_STATE_ID);
         }
+        if (xAttrs != null) {
+          XAttrStorage.updateINodeXAttrs(newNode, 
+              xAttrs, Snapshot.CURRENT_STATE_ID);
+        }
         return newNode;
       }
     } catch (IOException e) {
@@ -2877,6 +2890,122 @@ public class FSDirectory implements Clos
       readUnlock();
     }
   }
+  
+  void removeXAttr(String src, XAttr xAttr) throws IOException {
+    writeLock();
+    try {
+      unprotectedRemoveXAttr(src, xAttr);
+      //TODO: Recording XAttrs modifications to edit log will be 
+      //implemented as part of HDFS-6301
+    } finally {
+      writeUnlock();
+    }
+  }
+  
+  private List<XAttr> unprotectedRemoveXAttr(String src,
+      XAttr xAttr) throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INode inode = resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    List<XAttr> newXAttrs = filterINodeXAttr(existingXAttrs, xAttr);
+    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+    
+    return newXAttrs;
+  }
+  
+  List<XAttr> filterINodeXAttr(List<XAttr> existingXAttrs, 
+      XAttr xAttr) throws QuotaExceededException {
+    if (existingXAttrs == null || existingXAttrs.isEmpty()) {
+      return existingXAttrs;
+    }
+    
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(existingXAttrs.size());
+    for (XAttr a : existingXAttrs) {
+      if (!(a.getNameSpace() == xAttr.getNameSpace()
+          && a.getName().equalsIgnoreCase(xAttr.getName()))) {
+        xAttrs.add(a);
+      }
+    }
+    
+    return xAttrs;
+  }
+  
+  void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) 
+      throws IOException {
+    writeLock();
+    try {
+      unprotectedSetXAttr(src, xAttr, flag);
+      //TODO: Recording XAttrs modifications to edit log will be 
+      //implemented as part of HDFS-6301
+    } finally {
+      writeUnlock();
+    }
+  }
+  
+  List<XAttr> unprotectedSetXAttr(String src, XAttr xAttr, 
+      EnumSet<XAttrSetFlag> flag) throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INode inode = resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    List<XAttr> newXAttrs = setINodeXAttr(existingXAttrs, xAttr, flag);
+    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+    
+    return newXAttrs;
+  }
+  
+  List<XAttr> setINodeXAttr(List<XAttr> existingXAttrs, XAttr xAttr, 
+      EnumSet<XAttrSetFlag> flag) throws QuotaExceededException, IOException {
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(
+        existingXAttrs != null ? existingXAttrs.size() + 1 : 1);
+    boolean exist = false;
+    if (existingXAttrs != null) {
+      for (XAttr a: existingXAttrs) {
+        if ((a.getNameSpace() == xAttr.getNameSpace()
+            && a.getName().equalsIgnoreCase(xAttr.getName()))) {
+          exist = true;
+        } else {
+          xAttrs.add(a);
+        }
+      }
+    }
+    
+    XAttrSetFlag.validate(xAttr.getName(), exist, flag);
+    xAttrs.add(xAttr);
+    
+    if (xAttrs.size() > inodeXAttrsLimit) {
+      throw new IOException("Operation fails, XAttrs of " +
+          "inode exceeds maximum limit of " + inodeXAttrsLimit);
+    }
+    
+    return xAttrs;
+  }
+  
+  void unprotectedUpdateXAttrs(String src, List<XAttr> xAttrs) 
+      throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INode inode = resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    
+    XAttrStorage.updateINodeXAttrs(inode, xAttrs, snapshotId);
+  }
+  
+  List<XAttr> getXAttrs(String src) throws IOException {
+    String srcs = normalizePath(src);
+    readLock();
+    try {
+      INodesInPath iip = rootDir.getLastINodeInPath(srcs, true);
+      INode inode = resolveLastINode(src, iip);
+      int snapshotId = iip.getPathSnapshotId();
+      return XAttrStorage.readINodeXAttrs(inode, snapshotId);
+    } finally {
+      readUnlock();
+    }
+  }
 
   private static INode resolveLastINode(String src, INodesInPath iip)
       throws FileNotFoundException {

Modified: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1591866&r1=1591865&r2=1591866&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri May  2 10:57:27 2014
@@ -350,7 +350,7 @@ public class FSEditLogLoader {
         inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
             lastInodeId);
         newFile = fsDir.unprotectedAddFile(inodeId,
-            path, addCloseOp.permissions, addCloseOp.aclEntries,
+            path, addCloseOp.permissions, addCloseOp.aclEntries, null,
             replication, addCloseOp.mtime, addCloseOp.atime,
             addCloseOp.blockSize, true, addCloseOp.clientName,
             addCloseOp.clientMachine);

Modified: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1591866&r1=1591865&r2=1591866&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri May  2 10:57:27 2014
@@ -140,6 +140,8 @@ import org.apache.hadoop.fs.Options.Rena
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -517,7 +519,7 @@ public class FSNamesystem implements Nam
   
   private final RetryCache retryCache;
 
-  private final AclConfigFlag aclConfigFlag;
+  private final NNConf nnConf;
 
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded. 
@@ -784,7 +786,7 @@ public class FSNamesystem implements Nam
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
-      this.aclConfigFlag = new AclConfigFlag(conf);
+      this.nnConf = new NNConf(conf);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -7586,7 +7588,7 @@ public class FSNamesystem implements Nam
   }
 
   void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7607,7 +7609,7 @@ public class FSNamesystem implements Nam
   }
 
   void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7628,7 +7630,7 @@ public class FSNamesystem implements Nam
   }
 
   void removeDefaultAcl(String src) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7649,7 +7651,7 @@ public class FSNamesystem implements Nam
   }
 
   void removeAcl(String src) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7670,7 +7672,7 @@ public class FSNamesystem implements Nam
   }
 
   void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -7691,7 +7693,7 @@ public class FSNamesystem implements Nam
   }
 
   AclStatus getAclStatus(String src) throws IOException {
-    aclConfigFlag.checkForApiCall();
+    nnConf.checkAclsConfigFlag();
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     readLock();
@@ -7705,6 +7707,120 @@ public class FSNamesystem implements Nam
       readUnlock();
     }
   }
+  
+  void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) 
+      throws IOException {
+    nnConf.checkXAttrsConfigFlag();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    try {
+      XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setXAttr", src);
+      throw e;
+    }
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot set XAttr on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
+      
+      dir.setXAttr(src, xAttr, flag);
+      resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setXAttr", src);
+      throw e;
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "setXAttr", src, null, resultingStat);
+  }
+  
+  List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) throws IOException {
+    nnConf.checkXAttrsConfigFlag();
+    FSPermissionChecker pc = getPermissionChecker();
+    boolean getAll = xAttrs == null || xAttrs.isEmpty();
+    List<XAttr> filteredXAttrs = null;
+    if (!getAll) {
+      filteredXAttrs = XAttrPermissionFilter.filterXAttrsForApi(pc, xAttrs);
+      if (filteredXAttrs.isEmpty()) {
+        return filteredXAttrs;
+      }
+    }
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.READ);
+      }
+      List<XAttr> all = dir.getXAttrs(src);
+      List<XAttr> filteredAll = XAttrPermissionFilter.
+          filterXAttrsForApi(pc, all);
+      if (getAll) {
+        return filteredAll;
+      } else {
+        if (filteredAll == null || filteredAll.isEmpty()) {
+          return null;
+        }
+        List<XAttr> toGet = Lists.newArrayListWithCapacity(filteredXAttrs.size());
+        for (XAttr xAttr : filteredXAttrs) {
+          for (XAttr a : filteredAll) {
+            if (xAttr.getNameSpace() == a.getNameSpace()
+                && xAttr.getName().equals(a.getName())) {
+              toGet.add(a);
+              break;
+            }
+          }
+        }
+        return toGet;
+      }
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "getXAttrs", src);
+      throw e;
+    } finally {
+      readUnlock();
+    }
+  }
+  
+  void removeXAttr(String src, XAttr xAttr) throws IOException {
+    nnConf.checkXAttrsConfigFlag();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    try {
+      XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "removeXAttr", src);
+      throw e;
+    }
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
+      
+      dir.removeXAttr(src, xAttr);
+      resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "removeXAttr", src);
+      throw e;
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeXAttr", src, null, resultingStat);
+  }
 
   /**
    * Default AuditLogger implementation; used when no access logger is

Added: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNConf.java?rev=1591866&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNConf.java (added)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNConf.java Fri May  2 10:57:27 2014
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.AclException;
+
+/**
+ * This class is a common place for NN configuration.
+ */
+final class NNConf {
+  /**
+   * Support for ACLs is controlled by a configuration flag. If the 
+   * configuration flag is false, then the NameNode will reject all 
+   * ACL-related operations.
+   */
+  private final boolean aclsEnabled;
+  
+  /**
+   * Support for XAttrs is controlled by a configuration flag. If the 
+   * configuration flag is false, then the NameNode will reject all 
+   * XAttr-related operations.
+   */
+  private final boolean xattrsEnabled;
+
+  /**
+   * Creates a new NNConf from configuration.
+   *
+   * @param conf Configuration to check
+   */
+  public NNConf(Configuration conf) {
+    aclsEnabled = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
+      DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
+    LogFactory.getLog(NNConf.class).info("ACLs enabled? " + aclsEnabled);
+    xattrsEnabled = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, 
+      DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT);
+    LogFactory.getLog(NNConf.class).info("XAttrs enabled? " + xattrsEnabled);
+  }
+
+  /**
+   * Checks the flag on behalf of an ACL API call.
+   *
+   * @throws AclException if ACLs are disabled
+   */
+  public void checkAclsConfigFlag() throws AclException {
+    if (!aclsEnabled) {
+      throw new AclException(String.format(
+        "The ACL operation has been rejected.  "
+        + "Support for ACLs has been disabled by setting %s to false.",
+        DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
+    }
+  }
+  
+  /**
+   * Checks the flag on behalf of an XAttr API call.
+   * @throws IOException if XAttrs are disabled
+   */
+  public void checkXAttrsConfigFlag() throws IOException {
+    if (!xattrsEnabled) {
+      throw new IOException(String.format(
+        "The XAttr operation has been rejected.  "
+        + "Support for XAttrs has been disabled by setting %s to false.",
+        DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY));
+    }
+  }
+}

Modified: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1591866&r1=1591865&r2=1591866&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri May  2 10:57:27 2014
@@ -1384,19 +1384,18 @@ class NameNodeRpcServer implements Namen
   @Override
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
-    //TODO: will be implemented as part of HDFS-6258
+    namesystem.setXAttr(src, xAttr, flag);
   }
   
   @Override
   public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) 
       throws IOException {
-    //TODO: will be implemented as part of HDFS-6258
-    return null;
+    return namesystem.getXAttrs(src, xAttrs);
   }
   
   @Override
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
-    //TODO: will be implemented as part of HDFS-6258
+    namesystem.removeXAttr(src, xAttr);
   }
 }
 

Added: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java?rev=1591866&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java (added)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java Fri May  2 10:57:27 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.security.AccessControlException;
+
+import com.google.common.collect.Lists;
+
+/**
+ * There are four types of extended attributes <XAttr> defined by the
+ * following namespaces:
+ * <br>
+ * USER - extended user attributes: these can be assigned to files and
+ * directories to store arbitrary additional information. The access
+ * permissions for user attributes are defined by the file permission
+ * bits.
+ * <br>
+ * TRUSTED - trusted extended attributes: these are visible/accessible
+ * only to/by the super user.
+ * <br>
+ * SECURITY - extended security attributes: these are used by the HDFS
+ * core for security purposes and are not available through admin/user
+ * API.
+ * <br>
+ * SYSTEM - extended system attributes: these are used by the HDFS
+ * core and are not available through admin/user API.
+ */
+@InterfaceAudience.Private
+public class XAttrPermissionFilter {
+  
+  static void checkPermissionForApi(FSPermissionChecker pc, XAttr xAttr) 
+      throws AccessControlException {
+    if (xAttr.getNameSpace() == XAttr.NameSpace.USER || 
+        (xAttr.getNameSpace() == XAttr.NameSpace.TRUSTED && 
+        pc.isSuperUser())) {
+      return;
+    }
+    throw new AccessControlException("User doesn't have permission for xattr: "
+        + xAttr.getNameSpace().toString().toLowerCase() + "." + xAttr.getName());
+  }
+  
+  static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc, 
+      List<XAttr> xAttrs) {
+    assert xAttrs != null : "xAttrs can not be null";
+    if (xAttrs == null || xAttrs.isEmpty()) {
+      return xAttrs;
+    }
+    
+    List<XAttr> filteredXAttrs = Lists.newArrayListWithCapacity(xAttrs.size());
+    for (XAttr xAttr : xAttrs) {
+      if (xAttr.getNameSpace() == XAttr.NameSpace.USER) {
+        filteredXAttrs.add(xAttr);
+      } else if (xAttr.getNameSpace() == XAttr.NameSpace.TRUSTED && 
+          pc.isSuperUser()) {
+        filteredXAttrs.add(xAttr);
+      }
+    }
+    
+    return filteredXAttrs;
+  }
+}

Added: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java?rev=1591866&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java (added)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java Fri May  2 10:57:27 2014
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * XAttrStorage is used to read and set xattrs for an inode.
+ */
+@InterfaceAudience.Private
+public class XAttrStorage {
+  
+  /**
+   * Reads the existing extended attributes of an inode. If the 
+   * inode does not have an <code>XAttr</code>, then this method
+   * returns an empty list.
+   * @param inode INode to read
+   * @param snapshotId
+   * @return List<XAttr> <code>XAttr</code> list. 
+   */
+  public static List<XAttr> readINodeXAttrs(INode inode, int snapshotId) {
+    XAttrFeature f = inode.getXAttrFeature(snapshotId);
+    return f == null ? ImmutableList.<XAttr> of() : f.getXAttrs();
+  }
+  
+  /**
+   * Reads the existing extended attributes of an inode.
+   * @param inode INode to read.
+   * @return List<XAttr> <code>XAttr</code> list.
+   */
+  public static List<XAttr> readINodeXAttrs(INode inode) {
+    XAttrFeature f = inode.getXAttrFeature();
+    return f == null ? ImmutableList.<XAttr> of() : f.getXAttrs();
+  }
+  
+  /**
+   * Update xattrs of inode.
+   * @param inode INode to update
+   * @param xAttrs to update xAttrs.
+   * @param snapshotId
+   */
+  public static void updateINodeXAttrs(INode inode, 
+      List<XAttr> xAttrs, int snapshotId) throws QuotaExceededException {
+    if (xAttrs == null || xAttrs.isEmpty()) {
+      if (inode.getXAttrFeature(snapshotId) != null) {
+        inode.removeXAttrFeature(snapshotId);
+      }
+      return;
+    }
+    
+    ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(xAttrs);
+    if (inode.getXAttrFeature(snapshotId) != null) {
+      inode.removeXAttrFeature(snapshotId);
+    }
+    inode.addXAttrFeature(new XAttrFeature(newXAttrs));
+  }
+}

Added: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java?rev=1591866&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java (added)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java Fri May  2 10:57:27 2014
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests NameNode interaction for all XAttr APIs.
+ */
+public class FSXAttrBaseTest {
+  
+  protected static MiniDFSCluster dfsCluster;
+  protected static Configuration conf;
+  private static int pathCount = 0;
+  private static Path path;
+  
+  //xattrs
+  protected static final String name1 = "user.a1";
+  protected static final byte[] value1 = {0x31, 0x32, 0x33};
+  protected static final byte[] newValue1 = {0x31, 0x31, 0x31};
+  protected static final String name2 = "user.a2";
+  protected static final byte[] value2 = {0x37, 0x38, 0x39};
+  protected static final String name3 = "user.a3";
+  protected static final String name4 = "user.a4";
+
+  protected FileSystem fs;
+
+  @AfterClass
+  public static void shutdown() {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    pathCount += 1;
+    path = new Path("/p" + pathCount);
+    initFileSystem();
+  }
+
+  @After
+  public void destroyFileSystems() {
+    IOUtils.cleanup(null, fs);
+    fs = null;
+  }
+  
+  /**
+   * Tests for creating xattr
+   * 1. create xattr using XAttrSetFlag.CREATE flag.
+   * 2. Assert exception of creating xattr which already exists.
+   * 3. Create multiple xattrs
+   */
+  @Test
+  public void testCreateXAttr() throws Exception {
+    FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    
+    Map<String, byte[]> xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 1);
+    Assert.assertArrayEquals(value1, xattrs.get(name1));
+    
+    fs.removeXAttr(path, name1);
+    
+    xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 0);
+    
+    //create xattr which already exists.
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    try {
+      fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+      Assert.fail("Creating xattr which already exists should fail.");
+    } catch (IOException e) {
+    }
+    fs.removeXAttr(path, name1);
+    
+    //create two xattrs
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name2, null, EnumSet.of(XAttrSetFlag.CREATE));
+    xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 2);
+    Assert.assertArrayEquals(value1, xattrs.get(name1));
+    Assert.assertArrayEquals(new byte[0], xattrs.get(name2));
+    
+    fs.removeXAttr(path, name1);
+    fs.removeXAttr(path, name2);
+  }
+  
+  /**
+   * Tests for replacing xattr
+   * 1. Replace xattr using XAttrSetFlag.REPLACE flag.
+   * 2. Assert exception of replacing xattr which does not exist.
+   * 3. Create multiple xattrs, and replace some.
+   */
+  @Test
+  public void testReplaceXAttr() throws Exception {
+    FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name1, newValue1, EnumSet.of(XAttrSetFlag.REPLACE));
+    
+    Map<String, byte[]> xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 1);
+    Assert.assertArrayEquals(newValue1, xattrs.get(name1));
+    
+    fs.removeXAttr(path, name1);
+    
+    //replace xattr which does not exist.
+    try {
+      fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.REPLACE));
+      Assert.fail("Replacing xattr which does not exist should fail.");
+    } catch (IOException e) {
+    }
+    
+    //create two xattrs, then replace one
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name2, null, EnumSet.of(XAttrSetFlag.REPLACE));
+    xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 2);
+    Assert.assertArrayEquals(value1, xattrs.get(name1));
+    Assert.assertArrayEquals(new byte[0], xattrs.get(name2));
+    
+    fs.removeXAttr(path, name1);
+    fs.removeXAttr(path, name2);
+  }
+  
+  /**
+   * Tests for setting xattr
+   * 1. Set xattr with XAttrSetFlag.CREATE|XAttrSetFlag.REPLACE flag.
+   * 2. Set xattr with illegal name
+   * 3. Set xattr without XAttrSetFlag.
+   * 4. Set xattr and total number exceeds max limit
+   */
+  @Test
+  public void testSetXAttr() throws Exception {
+    FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE, 
+        XAttrSetFlag.REPLACE));
+        
+    Map<String, byte[]> xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 1);
+    Assert.assertArrayEquals(value1, xattrs.get(name1));
+    fs.removeXAttr(path, name1);
+    
+    //set xattr with null name
+    try {
+      fs.setXAttr(path, null, value1, EnumSet.of(XAttrSetFlag.CREATE, 
+          XAttrSetFlag.REPLACE));
+      Assert.fail("Setting xattr with null name should fail.");
+    } catch (NullPointerException e) {
+    }
+    
+    //set xattr with empty name: "user."
+    try {
+      fs.setXAttr(path, "user.", value1, EnumSet.of(XAttrSetFlag.CREATE, 
+          XAttrSetFlag.REPLACE));
+      Assert.fail("Setting xattr with empty name should fail.");
+    } catch (HadoopIllegalArgumentException e) {
+    }
+    
+    //set xattr with invalid name: "a1"
+    try {
+      fs.setXAttr(path, "a1", value1, EnumSet.of(XAttrSetFlag.CREATE, 
+          XAttrSetFlag.REPLACE));
+      Assert.fail("Setting xattr with invalid name prefix or without " +
+          "name prefix should fail.");
+    } catch (HadoopIllegalArgumentException e) {
+    }
+    
+    //set xattr without XAttrSetFlag
+    fs.setXAttr(path, name1, value1);
+    xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 1);
+    Assert.assertArrayEquals(value1, xattrs.get(name1));
+    fs.removeXAttr(path, name1);
+    
+    //xattr exists, and replace it using CREATE|REPLACE flag.
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name1, newValue1, EnumSet.of(XAttrSetFlag.CREATE, 
+        XAttrSetFlag.REPLACE));
+    
+    xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 1);
+    Assert.assertArrayEquals(newValue1, xattrs.get(name1));
+    
+    fs.removeXAttr(path, name1);
+    
+    //Total number exceeds max limit
+    fs.setXAttr(path, name1, value1);
+    fs.setXAttr(path, name2, value2);
+    fs.setXAttr(path, name3, null);
+    try {
+      fs.setXAttr(path, name4, null);
+      Assert.fail("Setting xattr should fail if total number of xattrs " +
+          "for inode exceeds max limit.");
+    } catch (IOException e) {
+    }
+    fs.removeXAttr(path, name1);
+    fs.removeXAttr(path, name2);
+    fs.removeXAttr(path, name3);
+  }
+  
+  /**
+   * Tests for getting xattr
+   * 1. To get xattr which does not exist.
+   * 2. To get multiple xattrs.
+   */
+  @Test
+  public void testGetXAttrs() throws Exception {
+    FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
+    
+    //xattr does not exist.
+    byte[] value = fs.getXAttr(path, name3);
+    Assert.assertEquals(value, null);
+    
+    List<String> names = Lists.newArrayList();
+    names.add(name1);
+    names.add(name2);
+    names.add(name3);
+    Map<String, byte[]> xattrs = fs.getXAttrs(path, names);
+    Assert.assertEquals(xattrs.size(), 2);
+    Assert.assertArrayEquals(value1, xattrs.get(name1));
+    Assert.assertArrayEquals(value2, xattrs.get(name2));
+    
+    fs.removeXAttr(path, name1);
+    fs.removeXAttr(path, name2);
+  }
+  
+  /**
+   * Tests for removing xattr
+   * 1. Remove xattr
+   */
+  @Test
+  public void testRemoveXAttr() throws Exception {
+    FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
+    fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name2, value2, EnumSet.of(XAttrSetFlag.CREATE));
+    fs.setXAttr(path, name3, null, EnumSet.of(XAttrSetFlag.CREATE));
+    
+    fs.removeXAttr(path, name1);
+    fs.removeXAttr(path, name2);
+    
+    Map<String, byte[]> xattrs = fs.getXAttrs(path);
+    Assert.assertEquals(xattrs.size(), 1);
+    Assert.assertArrayEquals(new byte[0], xattrs.get(name3));
+    
+    fs.removeXAttr(path, name3);
+  }
+  
+  /**
+   * Creates a FileSystem for the super-user.
+   *
+   * @return FileSystem for super-user
+   * @throws Exception if creation fails
+   */
+  protected FileSystem createFileSystem() throws Exception {
+    return dfsCluster.getFileSystem();
+  }
+  
+  /**
+   * Initializes all FileSystem instances used in the tests.
+   *
+   * @throws Exception if initialization fails
+   */
+  private void initFileSystem() throws Exception {
+    fs = createFileSystem();
+  }
+  
+  /**
+   * Initialize the cluster, wait for it to become active, and get FileSystem
+   * instances for our test users.
+   *
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @throws Exception if any step fails
+   */
+  protected static void initCluster(boolean format) throws Exception {
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
+      .build();
+    dfsCluster.waitActive();
+  }
+}

Added: hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeXAttr.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeXAttr.java?rev=1591866&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeXAttr.java (added)
+++ hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeXAttr.java Fri May  2 10:57:27 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.junit.BeforeClass;
+
+/**
+ * Tests NameNode interaction for all XAttr APIs.
+ */
+public class TestNameNodeXAttr extends FSXAttrBaseTest {
+  
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_INODE_XATTRS_MAX_LIMIT_KEY, 3);
+    initCluster(true);
+  }
+
+}