You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/09 04:31:34 UTC

[40/41] hadoop git commit: HDFS-7486. Consolidate XAttr-related implementation into a single class. Contributed by Haohui Mai.

HDFS-7486. Consolidate XAttr-related implementation into a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c5bbd7a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c5bbd7a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c5bbd7a

Branch: refs/heads/YARN-2139
Commit: 6c5bbd7a42d1e8b4416fd8870fd60c67867b35c9
Parents: 57cb43b
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Dec 8 11:52:21 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Dec 8 11:52:21 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/namenode/EncryptionZoneManager.java  |   3 +-
 .../hdfs/server/namenode/FSDirXAttrOp.java      | 460 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       | 295 ++----------
 .../hdfs/server/namenode/FSEditLogLoader.java   |  19 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 227 +--------
 .../hdfs/server/namenode/TestFSDirectory.java   |  47 +-
 7 files changed, 554 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fabb98f..55026a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -444,6 +444,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7384. 'getfacl' command and 'getAclStatus' output should be in sync.
     (Vinayakumar B via cnauroth)
 
+    HDFS-7486. Consolidate XAttr-related implementation into a single class.
+    (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index 135979f..faab1f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -311,7 +311,8 @@ public class EncryptionZoneManager {
     xattrs.add(ezXAttr);
     // updating the xattr will call addEncryptionZone,
     // done this way to handle edit log loading
-    dir.unprotectedSetXAttrs(src, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
+    FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
+                                      EnumSet.of(XAttrSetFlag.CREATE));
     return ezXAttr;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
new file mode 100644
index 0000000..303b9e3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.security.AccessControlException;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.ListIterator;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
+
+class FSDirXAttrOp {
+  private static final XAttr KEYID_XATTR =
+      XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, null);
+  private static final XAttr UNREADABLE_BY_SUPERUSER_XATTR =
+      XAttrHelper.buildXAttr(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER, null);
+
+  /**
+   * Set xattr for a file or directory.
+   *
+   * @param src
+   *          - path on which it sets the xattr
+   * @param xAttr
+   *          - xAttr details to set
+   * @param flag
+   *          - xAttrs flags
+   * @throws IOException
+   */
+  static HdfsFileStatus setXAttr(
+      FSDirectory fsd, String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
+      boolean logRetryCache)
+      throws IOException {
+    checkXAttrsConfigFlag(fsd);
+    checkXAttrSize(fsd, xAttr);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    XAttrPermissionFilter.checkPermissionForApi(
+        pc, xAttr, FSDirectory.isReservedRawName(src));
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
+        src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath4Write(src);
+    checkXAttrChangeAccess(fsd, iip, xAttr, pc);
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    xAttrs.add(xAttr);
+    fsd.writeLock();
+    try {
+      unprotectedSetXAttrs(fsd, src, xAttrs, flag);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static List<XAttr> getXAttrs(FSDirectory fsd, final String srcArg,
+                               List<XAttr> xAttrs)
+      throws IOException {
+    String src = srcArg;
+    checkXAttrsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final boolean isRawPath = FSDirectory.isReservedRawName(src);
+    boolean getAll = xAttrs == null || xAttrs.isEmpty();
+    if (!getAll) {
+      XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs, isRawPath);
+    }
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath(src, true);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkPathAccess(pc, iip, FsAction.READ);
+    }
+    List<XAttr> all = FSDirXAttrOp.getXAttrs(fsd, src);
+    List<XAttr> filteredAll = XAttrPermissionFilter.
+        filterXAttrsForApi(pc, all, isRawPath);
+
+    if (getAll) {
+      return filteredAll;
+    }
+    if (filteredAll == null || filteredAll.isEmpty()) {
+      return null;
+    }
+    List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size());
+    for (XAttr xAttr : xAttrs) {
+      boolean foundIt = false;
+      for (XAttr a : filteredAll) {
+        if (xAttr.getNameSpace() == a.getNameSpace() && xAttr.getName().equals(
+            a.getName())) {
+          toGet.add(a);
+          foundIt = true;
+          break;
+        }
+      }
+      if (!foundIt) {
+        throw new IOException(
+            "At least one of the attributes provided was not found.");
+      }
+    }
+    return toGet;
+  }
+
+  static List<XAttr> listXAttrs(
+      FSDirectory fsd, String src) throws IOException {
+    FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    final boolean isRawPath = FSDirectory.isReservedRawName(src);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath(src, true);
+    if (fsd.isPermissionEnabled()) {
+      /* To access xattr names, you need EXECUTE in the owning directory. */
+      fsd.checkParentAccess(pc, iip, FsAction.EXECUTE);
+    }
+    final List<XAttr> all = FSDirXAttrOp.getXAttrs(fsd, src);
+    return XAttrPermissionFilter.
+        filterXAttrsForApi(pc, all, isRawPath);
+  }
+
+  /**
+   * Remove an xattr for a file or directory.
+   *
+   * @param src
+   *          - path to remove the xattr from
+   * @param xAttr
+   *          - xAttr to remove
+   * @throws IOException
+   */
+  static HdfsFileStatus removeXAttr(
+      FSDirectory fsd, String src, XAttr xAttr, boolean logRetryCache)
+      throws IOException {
+    FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    XAttrPermissionFilter.checkPermissionForApi(
+        pc, xAttr, FSDirectory.isReservedRawName(src));
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
+        src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath4Write(src);
+    checkXAttrChangeAccess(fsd, iip, xAttr, pc);
+
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+    xAttrs.add(xAttr);
+    fsd.writeLock();
+    try {
+      List<XAttr> removedXAttrs = unprotectedRemoveXAttrs(fsd, src, xAttrs);
+      if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
+        fsd.getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache);
+      } else {
+        throw new IOException(
+            "No matching attributes found for remove operation");
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static List<XAttr> unprotectedRemoveXAttrs(
+      FSDirectory fsd, final String src, final List<XAttr> toRemove)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.getINodesInPath4Write(
+        FSDirectory.normalizePath(src), true);
+    INode inode = FSDirectory.resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
+    List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
+                                              removedXAttrs);
+    if (existingXAttrs.size() != newXAttrs.size()) {
+      XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+      return removedXAttrs;
+    }
+    return null;
+  }
+
+  /**
+   * Filter XAttrs from a list of existing XAttrs. Removes matched XAttrs from
+   * toFilter and puts them into filtered. Upon completion,
+   * toFilter contains the filter XAttrs that were not found, while
+   * fitleredXAttrs contains the XAttrs that were found.
+   *
+   * @param existingXAttrs Existing XAttrs to be filtered
+   * @param toFilter XAttrs to filter from the existing XAttrs
+   * @param filtered Return parameter, XAttrs that were filtered
+   * @return List of XAttrs that does not contain filtered XAttrs
+   */
+  @VisibleForTesting
+  static List<XAttr> filterINodeXAttrs(
+      final List<XAttr> existingXAttrs, final List<XAttr> toFilter,
+      final List<XAttr> filtered)
+    throws AccessControlException {
+    if (existingXAttrs == null || existingXAttrs.isEmpty() ||
+        toFilter == null || toFilter.isEmpty()) {
+      return existingXAttrs;
+    }
+
+    // Populate a new list with XAttrs that pass the filter
+    List<XAttr> newXAttrs =
+        Lists.newArrayListWithCapacity(existingXAttrs.size());
+    for (XAttr a : existingXAttrs) {
+      boolean add = true;
+      for (ListIterator<XAttr> it = toFilter.listIterator(); it.hasNext()
+          ;) {
+        XAttr filter = it.next();
+        Preconditions.checkArgument(
+            !KEYID_XATTR.equalsIgnoreValue(filter),
+            "The encryption zone xattr should never be deleted.");
+        if (UNREADABLE_BY_SUPERUSER_XATTR.equalsIgnoreValue(filter)) {
+          throw new AccessControlException("The xattr '" +
+              SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' can not be deleted.");
+        }
+        if (a.equalsIgnoreValue(filter)) {
+          add = false;
+          it.remove();
+          filtered.add(filter);
+          break;
+        }
+      }
+      if (add) {
+        newXAttrs.add(a);
+      }
+    }
+
+    return newXAttrs;
+  }
+
+  static INode unprotectedSetXAttrs(
+      FSDirectory fsd, final String src, final List<XAttr> xAttrs,
+      final EnumSet<XAttrSetFlag> flag)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src), true);
+    INode inode = FSDirectory.resolveLastINode(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    List<XAttr> newXAttrs = setINodeXAttrs(fsd, existingXAttrs, xAttrs, flag);
+    final boolean isFile = inode.isFile();
+
+    for (XAttr xattr : newXAttrs) {
+      final String xaName = XAttrHelper.getPrefixName(xattr);
+
+      /*
+       * If we're adding the encryption zone xattr, then add src to the list
+       * of encryption zones.
+       */
+      if (CRYPTO_XATTR_ENCRYPTION_ZONE.equals(xaName)) {
+        final HdfsProtos.ZoneEncryptionInfoProto ezProto =
+            HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
+        fsd.ezManager.addEncryptionZone(inode.getId(),
+                                        PBHelper.convert(ezProto.getSuite()),
+                                        PBHelper.convert(
+                                            ezProto.getCryptoProtocolVersion()),
+                                        ezProto.getKeyName());
+      }
+
+      if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {
+        throw new IOException("Can only set '" +
+            SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' on a file.");
+      }
+    }
+
+    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+    return inode;
+  }
+
+  static List<XAttr> setINodeXAttrs(
+      FSDirectory fsd, final List<XAttr> existingXAttrs,
+      final List<XAttr> toSet, final EnumSet<XAttrSetFlag> flag)
+      throws IOException {
+    // Check for duplicate XAttrs in toSet
+    // We need to use a custom comparator, so using a HashSet is not suitable
+    for (int i = 0; i < toSet.size(); i++) {
+      for (int j = i + 1; j < toSet.size(); j++) {
+        if (toSet.get(i).equalsIgnoreValue(toSet.get(j))) {
+          throw new IOException("Cannot specify the same XAttr to be set " +
+              "more than once");
+        }
+      }
+    }
+
+    // Count the current number of user-visible XAttrs for limit checking
+    int userVisibleXAttrsNum = 0; // Number of user visible xAttrs
+
+    // The XAttr list is copied to an exactly-sized array when it's stored,
+    // so there's no need to size it precisely here.
+    int newSize = (existingXAttrs != null) ? existingXAttrs.size() : 0;
+    newSize += toSet.size();
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(newSize);
+
+    // Check if the XAttr already exists to validate with the provided flag
+    for (XAttr xAttr: toSet) {
+      boolean exist = false;
+      if (existingXAttrs != null) {
+        for (XAttr a : existingXAttrs) {
+          if (a.equalsIgnoreValue(xAttr)) {
+            exist = true;
+            break;
+          }
+        }
+      }
+      XAttrSetFlag.validate(xAttr.getName(), exist, flag);
+      // add the new XAttr since it passed validation
+      xAttrs.add(xAttr);
+      if (isUserVisible(xAttr)) {
+        userVisibleXAttrsNum++;
+      }
+    }
+
+    // Add the existing xattrs back in, if they weren't already set
+    if (existingXAttrs != null) {
+      for (XAttr existing : existingXAttrs) {
+        boolean alreadySet = false;
+        for (XAttr set : toSet) {
+          if (set.equalsIgnoreValue(existing)) {
+            alreadySet = true;
+            break;
+          }
+        }
+        if (!alreadySet) {
+          xAttrs.add(existing);
+          if (isUserVisible(existing)) {
+            userVisibleXAttrsNum++;
+          }
+        }
+      }
+    }
+
+    if (userVisibleXAttrsNum > fsd.getInodeXAttrsLimit()) {
+      throw new IOException("Cannot add additional XAttr to inode, "
+          + "would exceed limit of " + fsd.getInodeXAttrsLimit());
+    }
+
+    return xAttrs;
+  }
+
+  static List<XAttr> getXAttrs(FSDirectory fsd, INode inode, int snapshotId)
+      throws IOException {
+    fsd.readLock();
+    try {
+      return XAttrStorage.readINodeXAttrs(inode, snapshotId);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  static XAttr unprotectedGetXAttrByName(
+      INode inode, int snapshotId, String xAttrName)
+      throws IOException {
+    List<XAttr> xAttrs = XAttrStorage.readINodeXAttrs(inode, snapshotId);
+    if (xAttrs == null) {
+      return null;
+    }
+    for (XAttr x : xAttrs) {
+      if (XAttrHelper.getPrefixName(x)
+          .equals(xAttrName)) {
+        return x;
+      }
+    }
+    return null;
+  }
+
+  private static void checkXAttrChangeAccess(
+      FSDirectory fsd, INodesInPath iip, XAttr xAttr,
+      FSPermissionChecker pc)
+      throws AccessControlException {
+    if (fsd.isPermissionEnabled() && xAttr.getNameSpace() == XAttr.NameSpace
+        .USER) {
+      final INode inode = iip.getLastINode();
+      if (inode != null &&
+          inode.isDirectory() &&
+          inode.getFsPermission().getStickyBit()) {
+        if (!pc.isSuperUser()) {
+          fsd.checkOwner(pc, iip);
+        }
+      } else {
+        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+    }
+  }
+
+  /**
+   * Verifies that the combined size of the name and value of an xattr is within
+   * the configured limit. Setting a limit of zero disables this check.
+   */
+  private static void checkXAttrSize(FSDirectory fsd, XAttr xAttr) {
+    if (fsd.getXattrMaxSize() == 0) {
+      return;
+    }
+    int size = xAttr.getName().getBytes(Charsets.UTF_8).length;
+    if (xAttr.getValue() != null) {
+      size += xAttr.getValue().length;
+    }
+    if (size > fsd.getXattrMaxSize()) {
+      throw new HadoopIllegalArgumentException(
+          "The XAttr is too big. The maximum combined size of the"
+          + " name and value is " + fsd.getXattrMaxSize()
+          + ", but the total size is " + size);
+    }
+  }
+
+  private static void checkXAttrsConfigFlag(FSDirectory fsd) throws
+                                                             IOException {
+    if (!fsd.isXattrsEnabled()) {
+      throw new IOException(String.format(
+          "The XAttr operation has been rejected.  "
+              + "Support for XAttrs has been disabled by setting %s to false.",
+          DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY));
+    }
+  }
+
+  private static List<XAttr> getXAttrs(FSDirectory fsd,
+                                String src) throws IOException {
+    String srcs = FSDirectory.normalizePath(src);
+    fsd.readLock();
+    try {
+      INodesInPath iip = fsd.getLastINodeInPath(srcs, true);
+      INode inode = FSDirectory.resolveLastINode(src, iip);
+      int snapshotId = iip.getPathSnapshotId();
+      return XAttrStorage.readINodeXAttrs(inode, snapshotId);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  private static boolean isUserVisible(XAttr xAttr) {
+    XAttr.NameSpace ns = xAttr.getNameSpace();
+    return ns == XAttr.NameSpace.USER || ns == XAttr.NameSpace.TRUSTED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index aee79af..e802627 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.Closeable;
@@ -30,7 +29,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -122,10 +120,6 @@ public class FSDirectory implements Closeable {
   public final static String DOT_INODES_STRING = ".inodes";
   public final static byte[] DOT_INODES = 
       DFSUtil.string2Bytes(DOT_INODES_STRING);
-  private final XAttr KEYID_XATTR =
-      XAttrHelper.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, null);
-  private final XAttr UNREADABLE_BY_SUPERUSER_XATTR =
-      XAttrHelper.buildXAttr(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER, null);
 
   INodeDirectory rootDir;
   private final FSNamesystem namesystem;
@@ -136,6 +130,7 @@ public class FSDirectory implements Closeable {
   private final int contentCountLimit; // max content summary counts per run
   private final INodeMap inodeMap; // Synchronized by dirLock
   private long yieldCount = 0; // keep track of lock yield count.
+
   private final int inodeXAttrsLimit; //inode xattrs max limit
 
   // lock to protect the directory and BlockMap
@@ -148,6 +143,8 @@ public class FSDirectory implements Closeable {
    * ACL-related operations.
    */
   private final boolean aclsEnabled;
+  private final boolean xattrsEnabled;
+  private final int xattrMaxSize;
   private final String fsOwnerShortUserName;
   private final String supergroup;
   private final INodeId inodeId;
@@ -213,6 +210,18 @@ public class FSDirectory implements Closeable {
         DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
         DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
     LOG.info("ACLs enabled? " + aclsEnabled);
+    this.xattrsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT);
+    LOG.info("XAttrs enabled? " + xattrsEnabled);
+    this.xattrMaxSize = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT);
+    Preconditions.checkArgument(xattrMaxSize >= 0,
+                                "Cannot set a negative value for the maximum size of an xattr (%s).",
+                                DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_KEY);
+    final String unlimited = xattrMaxSize == 0 ? " (unlimited)" : "";
+    LOG.info("Maximum size of an xattr: " + xattrMaxSize + unlimited);
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
@@ -274,6 +283,10 @@ public class FSDirectory implements Closeable {
   boolean isAclsEnabled() {
     return aclsEnabled;
   }
+  boolean isXattrsEnabled() {
+    return xattrsEnabled;
+  }
+  int getXattrMaxSize() { return xattrMaxSize; }
 
   int getLsLimit() {
     return lsLimit;
@@ -283,6 +296,10 @@ public class FSDirectory implements Closeable {
     return contentCountLimit;
   }
 
+  int getInodeXAttrsLimit() {
+    return inodeXAttrsLimit;
+  }
+
   FSEditLog getEditLog() {
     return editLog;
   }
@@ -613,8 +630,11 @@ public class FSDirectory implements Closeable {
       int latestSnapshotId) throws IOException {
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
-    List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr),
-        EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+    List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(this, existingXAttrs,
+                                                        Arrays.asList(xAttr),
+                                                        EnumSet.of(
+                                                            XAttrSetFlag.CREATE,
+                                                            XAttrSetFlag.REPLACE));
     XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
   }
 
@@ -1560,90 +1580,6 @@ public class FSDirectory implements Closeable {
     return addINode(path, symlink) ? symlink : null;
   }
 
-  /**
-   * Removes a list of XAttrs from an inode at a path.
-   *
-   * @param src path of inode
-   * @param toRemove XAttrs to be removed
-   * @return List of XAttrs that were removed
-   * @throws IOException if the inode does not exist, if quota is exceeded
-   */
-  List<XAttr> removeXAttrs(final String src, final List<XAttr> toRemove)
-      throws IOException {
-    writeLock();
-    try {
-      return unprotectedRemoveXAttrs(src, toRemove);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  List<XAttr> unprotectedRemoveXAttrs(final String src,
-      final List<XAttr> toRemove) throws IOException {
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
-    List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
-    List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
-        removedXAttrs);
-    if (existingXAttrs.size() != newXAttrs.size()) {
-      XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
-      return removedXAttrs;
-    }
-    return null;
-  }
-
-  /**
-   * Filter XAttrs from a list of existing XAttrs. Removes matched XAttrs from
-   * toFilter and puts them into filtered. Upon completion,
-   * toFilter contains the filter XAttrs that were not found, while
-   * fitleredXAttrs contains the XAttrs that were found.
-   *
-   * @param existingXAttrs Existing XAttrs to be filtered
-   * @param toFilter XAttrs to filter from the existing XAttrs
-   * @param filtered Return parameter, XAttrs that were filtered
-   * @return List of XAttrs that does not contain filtered XAttrs
-   */
-  @VisibleForTesting
-  List<XAttr> filterINodeXAttrs(final List<XAttr> existingXAttrs,
-      final List<XAttr> toFilter, final List<XAttr> filtered)
-    throws AccessControlException {
-    if (existingXAttrs == null || existingXAttrs.isEmpty() ||
-        toFilter == null || toFilter.isEmpty()) {
-      return existingXAttrs;
-    }
-
-    // Populate a new list with XAttrs that pass the filter
-    List<XAttr> newXAttrs =
-        Lists.newArrayListWithCapacity(existingXAttrs.size());
-    for (XAttr a : existingXAttrs) {
-      boolean add = true;
-      for (ListIterator<XAttr> it = toFilter.listIterator(); it.hasNext()
-          ;) {
-        XAttr filter = it.next();
-        Preconditions.checkArgument(!KEYID_XATTR.equalsIgnoreValue(filter),
-            "The encryption zone xattr should never be deleted.");
-        if (UNREADABLE_BY_SUPERUSER_XATTR.equalsIgnoreValue(filter)) {
-          throw new AccessControlException("The xattr '" +
-              SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' can not be deleted.");
-        }
-        if (a.equalsIgnoreValue(filter)) {
-          add = false;
-          it.remove();
-          filtered.add(filter);
-          break;
-        }
-      }
-      if (add) {
-        newXAttrs.add(a);
-      }
-    }
-
-    return newXAttrs;
-  }
-
   boolean isInAnEZ(INodesInPath iip)
       throws UnresolvedLinkException, SnapshotAccessControlException {
     readLock();
@@ -1709,7 +1645,8 @@ public class FSDirectory implements Closeable {
 
     writeLock();
     try {
-      unprotectedSetXAttrs(src, xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
+      FSDirXAttrOp.unprotectedSetXAttrs(this, src, xAttrs,
+                                        EnumSet.of(XAttrSetFlag.CREATE));
     } finally {
       writeUnlock();
     }
@@ -1752,8 +1689,9 @@ public class FSDirectory implements Closeable {
       final CipherSuite suite = encryptionZone.getSuite();
       final String keyName = encryptionZone.getKeyName();
 
-      XAttr fileXAttr = unprotectedGetXAttrByName(inode, snapshotId,
-          CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
+      XAttr fileXAttr = FSDirXAttrOp.unprotectedGetXAttrByName(inode,
+                                                               snapshotId,
+                                                               CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
 
       if (fileXAttr == null) {
         NameNode.LOG.warn("Could not find encryption XAttr for file " +
@@ -1775,173 +1713,6 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  void setXAttrs(final String src, final List<XAttr> xAttrs,
-      final EnumSet<XAttrSetFlag> flag) throws IOException {
-    writeLock();
-    try {
-      unprotectedSetXAttrs(src, xAttrs, flag);
-    } finally {
-      writeUnlock();
-    }
-  }
-  
-  INode unprotectedSetXAttrs(final String src, final List<XAttr> xAttrs,
-      final EnumSet<XAttrSetFlag> flag)
-      throws QuotaExceededException, IOException {
-    assert hasWriteLock();
-    INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
-    INode inode = resolveLastINode(src, iip);
-    int snapshotId = iip.getLatestSnapshotId();
-    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
-    List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, xAttrs, flag);
-    final boolean isFile = inode.isFile();
-
-    for (XAttr xattr : newXAttrs) {
-      final String xaName = XAttrHelper.getPrefixName(xattr);
-
-      /*
-       * If we're adding the encryption zone xattr, then add src to the list
-       * of encryption zones.
-       */
-      if (CRYPTO_XATTR_ENCRYPTION_ZONE.equals(xaName)) {
-        final HdfsProtos.ZoneEncryptionInfoProto ezProto =
-            HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
-        ezManager.addEncryptionZone(inode.getId(),
-            PBHelper.convert(ezProto.getSuite()),
-            PBHelper.convert(ezProto.getCryptoProtocolVersion()),
-            ezProto.getKeyName());
-      }
-
-      if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {
-        throw new IOException("Can only set '" +
-            SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' on a file.");
-      }
-    }
-
-    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
-    return inode;
-  }
-
-  List<XAttr> setINodeXAttrs(final List<XAttr> existingXAttrs,
-      final List<XAttr> toSet, final EnumSet<XAttrSetFlag> flag)
-      throws IOException {
-    // Check for duplicate XAttrs in toSet
-    // We need to use a custom comparator, so using a HashSet is not suitable
-    for (int i = 0; i < toSet.size(); i++) {
-      for (int j = i + 1; j < toSet.size(); j++) {
-        if (toSet.get(i).equalsIgnoreValue(toSet.get(j))) {
-          throw new IOException("Cannot specify the same XAttr to be set " +
-              "more than once");
-        }
-      }
-    }
-
-    // Count the current number of user-visible XAttrs for limit checking
-    int userVisibleXAttrsNum = 0; // Number of user visible xAttrs
-
-    // The XAttr list is copied to an exactly-sized array when it's stored,
-    // so there's no need to size it precisely here.
-    int newSize = (existingXAttrs != null) ? existingXAttrs.size() : 0;
-    newSize += toSet.size();
-    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(newSize);
-
-    // Check if the XAttr already exists to validate with the provided flag
-    for (XAttr xAttr: toSet) {
-      boolean exist = false;
-      if (existingXAttrs != null) {
-        for (XAttr a : existingXAttrs) {
-          if (a.equalsIgnoreValue(xAttr)) {
-            exist = true;
-            break;
-          }
-        }
-      }
-      XAttrSetFlag.validate(xAttr.getName(), exist, flag);
-      // add the new XAttr since it passed validation
-      xAttrs.add(xAttr);
-      if (isUserVisible(xAttr)) {
-        userVisibleXAttrsNum++;
-      }
-    }
-
-    // Add the existing xattrs back in, if they weren't already set
-    if (existingXAttrs != null) {
-      for (XAttr existing : existingXAttrs) {
-        boolean alreadySet = false;
-        for (XAttr set : toSet) {
-          if (set.equalsIgnoreValue(existing)) {
-            alreadySet = true;
-            break;
-          }
-        }
-        if (!alreadySet) {
-          xAttrs.add(existing);
-          if (isUserVisible(existing)) {
-            userVisibleXAttrsNum++;
-          }
-        }
-      }
-    }
-
-    if (userVisibleXAttrsNum > inodeXAttrsLimit) {
-      throw new IOException("Cannot add additional XAttr to inode, "
-          + "would exceed limit of " + inodeXAttrsLimit);
-    }
-
-    return xAttrs;
-  }
-  
-  private boolean isUserVisible(XAttr xAttr) {
-    if (xAttr.getNameSpace() == XAttr.NameSpace.USER || 
-        xAttr.getNameSpace() == XAttr.NameSpace.TRUSTED) {
-      return true;
-    }
-    return false;
-  }
-  
-  List<XAttr> getXAttrs(String src) throws IOException {
-    String srcs = normalizePath(src);
-    readLock();
-    try {
-      INodesInPath iip = getLastINodeInPath(srcs, true);
-      INode inode = resolveLastINode(src, iip);
-      int snapshotId = iip.getPathSnapshotId();
-      return unprotectedGetXAttrs(inode, snapshotId);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  List<XAttr> getXAttrs(INode inode, int snapshotId) throws IOException {
-    readLock();
-    try {
-      return unprotectedGetXAttrs(inode, snapshotId);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  private List<XAttr> unprotectedGetXAttrs(INode inode, int snapshotId)
-      throws IOException {
-    return XAttrStorage.readINodeXAttrs(inode, snapshotId);
-  }
-
-  private XAttr unprotectedGetXAttrByName(INode inode, int snapshotId,
-      String xAttrName)
-      throws IOException {
-    List<XAttr> xAttrs = XAttrStorage.readINodeXAttrs(inode, snapshotId);
-    if (xAttrs == null) {
-      return null;
-    }
-    for (XAttr x : xAttrs) {
-      if (XAttrHelper.getPrefixName(x)
-          .equals(xAttrName)) {
-        return x;
-      }
-    }
-    return null;
-  }
-
   static INode resolveLastINode(String src, INodesInPath iip)
       throws FileNotFoundException {
     INode[] inodes = iip.getINodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index d63545b..d12ae15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -537,7 +537,8 @@ public class FSEditLogLoader {
     }
     case OP_SET_GENSTAMP_V1: {
       SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
-      fsNamesys.getBlockIdManager().setGenerationStampV1(setGenstampV1Op.genStampV1);
+      fsNamesys.getBlockIdManager().setGenerationStampV1(
+          setGenstampV1Op.genStampV1);
       break;
     }
     case OP_SET_PERMISSIONS: {
@@ -730,12 +731,14 @@ public class FSEditLogLoader {
     }
     case OP_SET_GENSTAMP_V2: {
       SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
-      fsNamesys.getBlockIdManager().setGenerationStampV2(setGenstampV2Op.genStampV2);
+      fsNamesys.getBlockIdManager().setGenerationStampV2(
+          setGenstampV2Op.genStampV2);
       break;
     }
     case OP_ALLOCATE_BLOCK_ID: {
       AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
-      fsNamesys.getBlockIdManager().setLastAllocatedBlockId(allocateBlockIdOp.blockId);
+      fsNamesys.getBlockIdManager().setLastAllocatedBlockId(
+          allocateBlockIdOp.blockId);
       break;
     }
     case OP_ROLLING_UPGRADE_START: {
@@ -828,8 +831,10 @@ public class FSEditLogLoader {
     }
     case OP_SET_XATTR: {
       SetXAttrOp setXAttrOp = (SetXAttrOp) op;
-      fsDir.unprotectedSetXAttrs(setXAttrOp.src, setXAttrOp.xAttrs,
-          EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+      FSDirXAttrOp.unprotectedSetXAttrs(fsDir, setXAttrOp.src,
+                                        setXAttrOp.xAttrs,
+                                        EnumSet.of(XAttrSetFlag.CREATE,
+                                                   XAttrSetFlag.REPLACE));
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId);
       }
@@ -837,8 +842,8 @@ public class FSEditLogLoader {
     }
     case OP_REMOVE_XATTR: {
       RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
-      fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
-          removeXAttrOp.xAttrs);
+      FSDirXAttrOp.unprotectedRemoveXAttrs(fsDir, removeXAttrOp.src,
+                                           removeXAttrOp.xAttrs);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
             removeXAttrOp.rpcCallId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e9ce78c..2b530fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -532,9 +532,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final RetryCache retryCache;
 
-  private final boolean xattrsEnabled;
-  private final int xattrMaxSize;
-
   private KeyProviderCryptoExtension provider = null;
 
   private volatile boolean imageLoaded = false;
@@ -849,19 +846,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
-
-      this.xattrsEnabled = conf.getBoolean(
-          DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY,
-          DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT);
-      LOG.info("XAttrs enabled? " + xattrsEnabled);
-      this.xattrMaxSize = conf.getInt(
-          DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_KEY,
-          DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT);
-      Preconditions.checkArgument(xattrMaxSize >= 0,
-          "Cannot set a negative value for the maximum size of an xattr (%s).",
-          DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_KEY);
-      final String unlimited = xattrMaxSize == 0 ? " (unlimited)" : "";
-      LOG.info("Maximum size of an xattr: " + xattrMaxSize + unlimited);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -5827,7 +5811,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       INode inode, int snapshotId)
       throws IOException {
     if (pc.isSuperUser()) {
-      for (XAttr xattr : dir.getXAttrs(inode, snapshotId)) {
+      for (XAttr xattr : FSDirXAttrOp.getXAttrs(dir, inode, snapshotId)) {
         if (XAttrHelper.getPrefixName(xattr).
             equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
           throw new AccessControlException("Access is denied for " +
@@ -7967,136 +7951,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  /**
-   * Set xattr for a file or directory.
-   * 
-   * @param src
-   *          - path on which it sets the xattr
-   * @param xAttr
-   *          - xAttr details to set
-   * @param flag
-   *          - xAttrs flags
-   * @throws AccessControlException
-   * @throws SafeModeException
-   * @throws UnresolvedLinkException
-   * @throws IOException
-   */
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
                 boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException {
-    try {
-      setXAttrInt(src, xAttr, flag, logRetryCache);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "setXAttr", src);
-      throw e;
-    }
-  }
-  
-  private void setXAttrInt(final String srcArg, XAttr xAttr,
-      EnumSet<XAttrSetFlag> flag, boolean logRetryCache) throws IOException {
-    String src = srcArg;
-    checkXAttrsConfigFlag();
-    checkXAttrSize(xAttr);
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
-    XAttrPermissionFilter.checkPermissionForApi(pc, xAttr,
-        FSDirectory.isReservedRawName(src));
+      throws IOException {
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    HdfsFileStatus auditStat = null;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      checkXAttrChangeAccess(iip, xAttr, pc);
-      List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
-      xAttrs.add(xAttr);
-      dir.setXAttrs(src, xAttrs, flag);
-      getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirXAttrOp.setXAttr(dir, src, xAttr, flag, logRetryCache);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setXAttr", src);
+      throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setXAttr", srcArg, null, resultingStat);
+    logAuditEvent(true, "setXAttr", src, null, auditStat);
   }
 
-  /**
-   * Verifies that the combined size of the name and value of an xattr is within
-   * the configured limit. Setting a limit of zero disables this check.
-   */
-  private void checkXAttrSize(XAttr xAttr) {
-    if (xattrMaxSize == 0) {
-      return;
-    }
-    int size = xAttr.getName().getBytes(Charsets.UTF_8).length;
-    if (xAttr.getValue() != null) {
-      size += xAttr.getValue().length;
-    }
-    if (size > xattrMaxSize) {
-      throw new HadoopIllegalArgumentException(
-          "The XAttr is too big. The maximum combined size of the"
-          + " name and value is " + xattrMaxSize
-          + ", but the total size is " + size);
-    }
-  }
-  
-  List<XAttr> getXAttrs(final String srcArg, List<XAttr> xAttrs)
+  List<XAttr> getXAttrs(final String src, List<XAttr> xAttrs)
       throws IOException {
-    String src = srcArg;
-    checkXAttrsConfigFlag();
-    FSPermissionChecker pc = getPermissionChecker();
-    final boolean isRawPath = FSDirectory.isReservedRawName(src);
-    boolean getAll = xAttrs == null || xAttrs.isEmpty();
-    if (!getAll) {
-      try {
-        XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs, isRawPath);
-      } catch (AccessControlException e) {
-        logAuditEvent(false, "getXAttrs", srcArg);
-        throw e;
-      }
-    }
     checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath(src, true);
-      if (isPermissionEnabled) {
-        dir.checkPathAccess(pc, iip, FsAction.READ);
-      }
-      List<XAttr> all = dir.getXAttrs(src);
-      List<XAttr> filteredAll = XAttrPermissionFilter.
-          filterXAttrsForApi(pc, all, isRawPath);
-      if (getAll) {
-        return filteredAll;
-      } else {
-        if (filteredAll == null || filteredAll.isEmpty()) {
-          return null;
-        }
-        List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size());
-        for (XAttr xAttr : xAttrs) {
-          boolean foundIt = false;
-          for (XAttr a : filteredAll) {
-            if (xAttr.getNameSpace() == a.getNameSpace()
-                && xAttr.getName().equals(a.getName())) {
-              toGet.add(a);
-              foundIt = true;
-              break;
-            }
-          }
-          if (!foundIt) {
-            throw new IOException(
-                "At least one of the attributes provided was not found.");
-          }
-        }
-        return toGet;
-      }
+      return FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
     } catch (AccessControlException e) {
-      logAuditEvent(false, "getXAttrs", srcArg);
+      logAuditEvent(false, "getXAttrs", src);
       throw e;
     } finally {
       readUnlock();
@@ -8104,23 +7987,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   List<XAttr> listXAttrs(String src) throws IOException {
-    checkXAttrsConfigFlag();
-    final FSPermissionChecker pc = getPermissionChecker();
-    final boolean isRawPath = FSDirectory.isReservedRawName(src);
     checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath(src, true);
-      if (isPermissionEnabled) {
-        /* To access xattr names, you need EXECUTE in the owning directory. */
-        dir.checkParentAccess(pc, iip, FsAction.EXECUTE);
-      }
-      final List<XAttr> all = dir.getXAttrs(src);
-      return XAttrPermissionFilter.
-        filterXAttrsForApi(pc, all, isRawPath);
+      return FSDirXAttrOp.listXAttrs(dir, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, "listXAttrs", src);
       throw e;
@@ -8129,77 +8000,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  /**
-   * Remove an xattr for a file or directory.
-   *
-   * @param src
-   *          - path to remove the xattr from
-   * @param xAttr
-   *          - xAttr to remove
-   * @throws AccessControlException
-   * @throws SafeModeException
-   * @throws UnresolvedLinkException
-   * @throws IOException
-   */
   void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
       throws IOException {
-    try {
-      removeXAttrInt(src, xAttr, logRetryCache);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "removeXAttr", src);
-      throw e;
-    }
-  }
-
-  void removeXAttrInt(final String srcArg, XAttr xAttr, boolean logRetryCache)
-      throws IOException {
-    String src = srcArg;
-    checkXAttrsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
-    XAttrPermissionFilter.checkPermissionForApi(pc, xAttr,
-        FSDirectory.isReservedRawName(src));
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    HdfsFileStatus auditStat = null;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      checkXAttrChangeAccess(iip, xAttr, pc);
-
-      List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
-      xAttrs.add(xAttr);
-      List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs);
-      if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
-        getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache);
-      } else {
-        throw new IOException(
-            "No matching attributes found for remove operation");
-      }
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirXAttrOp.removeXAttr(dir, src, xAttr, logRetryCache);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "removeXAttr", src);
+      throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "removeXAttr", srcArg, null, resultingStat);
-  }
-
-  private void checkXAttrChangeAccess(INodesInPath iip, XAttr xAttr,
-      FSPermissionChecker pc) throws AccessControlException {
-    if (isPermissionEnabled && xAttr.getNameSpace() == XAttr.NameSpace.USER) {
-      final INode inode = iip.getLastINode();
-      if (inode != null &&
-          inode.isDirectory() &&
-          inode.getFsPermission().getStickyBit()) {
-        if (!pc.isSuperUser()) {
-          dir.checkOwner(pc, iip);
-        }
-      } else {
-        dir.checkPathAccess(pc, iip, FsAction.WRITE);
-      }
-    }
+    logAuditEvent(true, "removeXAttr", src, null, auditStat);
   }
 
   void checkAccess(String src, FsAction mode) throws IOException {
@@ -8311,13 +8128,5 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  private void checkXAttrsConfigFlag() throws IOException {
-    if (!xattrsEnabled) {
-      throw new IOException(String.format(
-          "The XAttr operation has been rejected.  "
-              + "Support for XAttrs has been disabled by setting %s to false.",
-          DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY));
-    }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c5bbd7a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
index ad067cf..11905bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -201,8 +200,9 @@ public class TestFSDirectory {
     List<XAttr> newXAttrs = Lists.newArrayListWithCapacity(2);
     newXAttrs.add(newSystemXAttr);
     newXAttrs.add(newRawXAttr);
-    List<XAttr> xAttrs = fsdir.setINodeXAttrs(existingXAttrs, newXAttrs,
-        EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+    List<XAttr> xAttrs = FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs,
+                                                     newXAttrs, EnumSet.of(
+            XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
     assertEquals(xAttrs.size(), 4);
     
     // Adding a trusted namespace xAttr, is affected by inode xAttrs limit.
@@ -211,8 +211,9 @@ public class TestFSDirectory {
         setValue(new byte[]{0x34, 0x34, 0x34}).build();
     newXAttrs.set(0, newXAttr1);
     try {
-      fsdir.setINodeXAttrs(existingXAttrs, newXAttrs,
-          EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+      FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs, newXAttrs,
+                                  EnumSet.of(XAttrSetFlag.CREATE,
+                                             XAttrSetFlag.REPLACE));
       fail("Setting user visible xattr on inode should fail if " +
           "reaching limit.");
     } catch (IOException e) {
@@ -275,8 +276,9 @@ public class TestFSDirectory {
       for (int i = 0; i < toAdd.size(); i++) {
         LOG.info("Will add XAttr " + toAdd.get(i));
       }
-      List<XAttr> newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
-          EnumSet.of(XAttrSetFlag.CREATE));
+      List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs,
+                                                          toAdd, EnumSet.of(
+              XAttrSetFlag.CREATE));
       verifyXAttrsPresent(newXAttrs, numExpectedXAttrs);
       existingXAttrs = newXAttrs;
     }
@@ -296,8 +298,9 @@ public class TestFSDirectory {
       final int expectedNumToRemove = toRemove.size();
       LOG.info("Attempting to remove " + expectedNumToRemove + " XAttrs");
       List<XAttr> removedXAttrs = Lists.newArrayList();
-      List<XAttr> newXAttrs = fsdir.filterINodeXAttrs(existingXAttrs,
-          toRemove, removedXAttrs);
+      List<XAttr> newXAttrs = FSDirXAttrOp.filterINodeXAttrs(existingXAttrs,
+                                                             toRemove,
+                                                             removedXAttrs);
       assertEquals("Unexpected number of removed XAttrs",
           expectedNumToRemove, removedXAttrs.size());
       verifyXAttrsPresent(newXAttrs, numExpectedXAttrs);
@@ -316,8 +319,8 @@ public class TestFSDirectory {
     toAdd.add(generatedXAttrs.get(2));
     toAdd.add(generatedXAttrs.get(0));
     try {
-      fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag
-          .CREATE));
+      FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs, toAdd,
+                                  EnumSet.of(XAttrSetFlag.CREATE));
       fail("Specified the same xattr to be set twice");
     } catch (IOException e) {
       GenericTestUtils.assertExceptionContains("Cannot specify the same " +
@@ -328,15 +331,15 @@ public class TestFSDirectory {
     toAdd.remove(generatedXAttrs.get(0));
     existingXAttrs.add(generatedXAttrs.get(0));
     try {
-      fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag
-          .CREATE));
+      FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs, toAdd,
+                                  EnumSet.of(XAttrSetFlag.CREATE));
       fail("Set XAttr that is already set without REPLACE flag");
     } catch (IOException e) {
       GenericTestUtils.assertExceptionContains("already exists", e);
     }
     try {
-      fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag
-          .REPLACE));
+      FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs, toAdd,
+                                  EnumSet.of(XAttrSetFlag.REPLACE));
       fail("Set XAttr that does not exist without the CREATE flag");
     } catch (IOException e) {
       GenericTestUtils.assertExceptionContains("does not exist", e);
@@ -344,8 +347,9 @@ public class TestFSDirectory {
 
     // Sanity test for CREATE
     toAdd.remove(generatedXAttrs.get(0));
-    List<XAttr> newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
-        EnumSet.of(XAttrSetFlag.CREATE));
+    List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs,
+                                                        toAdd, EnumSet.of(
+            XAttrSetFlag.CREATE));
     assertEquals("Unexpected toAdd size", 2, toAdd.size());
     for (XAttr x : toAdd) {
       assertTrue("Did not find added XAttr " + x, newXAttrs.contains(x));
@@ -362,8 +366,8 @@ public class TestFSDirectory {
           .build();
       toAdd.add(xAttr);
     }
-    newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
-        EnumSet.of(XAttrSetFlag.REPLACE));
+    newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs, toAdd,
+                                            EnumSet.of(XAttrSetFlag.REPLACE));
     assertEquals("Unexpected number of new XAttrs", 3, newXAttrs.size());
     for (int i=0; i<3; i++) {
       assertArrayEquals("Unexpected XAttr value",
@@ -376,8 +380,9 @@ public class TestFSDirectory {
     for (int i=0; i<4; i++) {
       toAdd.add(generatedXAttrs.get(i));
     }
-    newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
-        EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+    newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsdir, existingXAttrs, toAdd,
+                                            EnumSet.of(XAttrSetFlag.CREATE,
+                                                       XAttrSetFlag.REPLACE));
     verifyXAttrsPresent(newXAttrs, 4);
   }
 }