You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/12/15 19:49:22 UTC

hadoop git commit: HDFS-7506. Consolidate implementation of setting inode attributes into a single class. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 c9950f00a -> 282b5ed8e


HDFS-7506. Consolidate implementation of setting inode attributes into a single class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/282b5ed8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/282b5ed8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/282b5ed8

Branch: refs/heads/branch-2
Commit: 282b5ed8ecf8810ee23d185548a5617c80e42d28
Parents: c9950f0
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Dec 15 10:40:33 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Dec 15 10:49:10 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirAttrOp.java       | 455 +++++++++++++++++++
 .../server/namenode/FSDirStatAndListingOp.java  |   7 +-
 .../hdfs/server/namenode/FSDirectory.java       | 352 ++------------
 .../hdfs/server/namenode/FSEditLogLoader.java   |  36 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 243 ++--------
 6 files changed, 572 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/282b5ed8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 87436c2..4af5591 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -333,6 +333,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7514. TestTextCommand fails on Windows. (Arpit Agarwal)
 
+    HDFS-7506. Consolidate implementation of setting inode attributes into a
+    single class. (wheat9)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/282b5ed8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
new file mode 100644
index 0000000..1e3c401
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.PathIsNotDirectoryException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.security.AccessControlException;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+
+public class FSDirAttrOp {
+  static HdfsFileStatus setPermission(
+      FSDirectory fsd, final String srcArg, FsPermission permission)
+      throws IOException {
+    String src = srcArg;
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      fsd.checkOwner(pc, iip);
+      unprotectedSetPermission(fsd, src, permission);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetPermissions(src, permission);
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static HdfsFileStatus setOwner(
+      FSDirectory fsd, String src, String username, String group)
+      throws IOException {
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      fsd.checkOwner(pc, iip);
+      if (!pc.isSuperUser()) {
+        if (username != null && !pc.getUser().equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner");
+        }
+        if (group != null && !pc.containsGroup(group)) {
+          throw new AccessControlException("User does not belong to " + group);
+        }
+      }
+      unprotectedSetOwner(fsd, src, username, group);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logSetOwner(src, username, group);
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static HdfsFileStatus setTimes(
+      FSDirectory fsd, String src, long mtime, long atime)
+      throws IOException {
+    if (!fsd.isAccessTimeSupported() && atime != -1) {
+      throw new IOException(
+          "Access time for hdfs is not configured. " +
+              " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY
+              + " configuration parameter.");
+    }
+
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      // Write access is required to set access and modification times
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+      final INode inode = iip.getLastINode();
+      if (inode == null) {
+        throw new FileNotFoundException("File/Directory " + src +
+                                            " does not exist.");
+      }
+      boolean changed = unprotectedSetTimes(fsd, inode, mtime, atime, true,
+                                            iip.getLatestSnapshotId());
+      if (changed) {
+        fsd.getEditLog().logTimes(src, mtime, atime);
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static boolean setReplication(
+      FSDirectory fsd, BlockManager bm, String src, final short replication)
+      throws IOException {
+    bm.verifyReplication(src, replication, null);
+    final boolean isFile;
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+
+      final short[] blockRepls = new short[2]; // 0: old, 1: new
+      final Block[] blocks = unprotectedSetReplication(fsd, src, replication,
+                                                       blockRepls);
+      isFile = blocks != null;
+      if (isFile) {
+        fsd.getEditLog().logSetReplication(src, replication);
+        bm.setReplication(blockRepls[0], blockRepls[1], src, blocks);
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+    return isFile;
+  }
+
+  static HdfsFileStatus setStoragePolicy(
+      FSDirectory fsd, BlockManager bm, String src, final String policyName)
+      throws IOException {
+    if (!fsd.isStoragePolicyEnabled()) {
+      throw new IOException(
+          "Failed to set storage policy since "
+              + DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
+    }
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    fsd.writeLock();
+    try {
+      src = FSDirectory.resolvePath(src, pathComponents, fsd);
+      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+
+      // get the corresponding policy and make sure the policy name is valid
+      BlockStoragePolicy policy = bm.getStoragePolicy(policyName);
+      if (policy == null) {
+        throw new HadoopIllegalArgumentException(
+            "Cannot find a block policy with the name " + policyName);
+      }
+      unprotectedSetStoragePolicy(fsd, bm, iip, policy.getId());
+      fsd.getEditLog().logSetStoragePolicy(src, policy.getId());
+    } finally {
+      fsd.writeUnlock();
+    }
+    return fsd.getAuditFileInfo(src, false);
+  }
+
+  static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
+      throws IOException {
+    return bm.getStoragePolicies();
+  }
+
+  static long getPreferredBlockSize(FSDirectory fsd, String src)
+      throws IOException {
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    fsd.readLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath(src, false);
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkTraverse(pc, iip);
+      }
+      return INodeFile.valueOf(iip.getLastINode(), src)
+          .getPreferredBlockSize();
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  /**
+   * Set the namespace quota and diskspace quota for a directory.
+   *
+   * Note: This does not support ".inodes" relative path.
+   */
+  static void setQuota(FSDirectory fsd, String src, long nsQuota, long dsQuota)
+      throws IOException {
+    if (fsd.isPermissionEnabled()) {
+      FSPermissionChecker pc = fsd.getPermissionChecker();
+      pc.checkSuperuserPrivilege();
+    }
+
+    fsd.writeLock();
+    try {
+      INodeDirectory changed = unprotectedSetQuota(fsd, src, nsQuota, dsQuota);
+      if (changed != null) {
+        final Quota.Counts q = changed.getQuotaCounts();
+        fsd.getEditLog().logSetQuota(
+            src, q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+  }
+
+  static void unprotectedSetPermission(
+      FSDirectory fsd, String src, FsPermission permissions)
+      throws FileNotFoundException, UnresolvedLinkException,
+             QuotaExceededException, SnapshotAccessControlException {
+    assert fsd.hasWriteLock();
+    final INodesInPath inodesInPath = fsd.getINodesInPath4Write(src, true);
+    final INode inode = inodesInPath.getLastINode();
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + src);
+    }
+    int snapshotId = inodesInPath.getLatestSnapshotId();
+    inode.setPermission(permissions, snapshotId);
+  }
+
+  static void unprotectedSetOwner(
+      FSDirectory fsd, String src, String username, String groupname)
+      throws FileNotFoundException, UnresolvedLinkException,
+      QuotaExceededException, SnapshotAccessControlException {
+    assert fsd.hasWriteLock();
+    final INodesInPath inodesInPath = fsd.getINodesInPath4Write(src, true);
+    INode inode = inodesInPath.getLastINode();
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + src);
+    }
+    if (username != null) {
+      inode = inode.setUser(username, inodesInPath.getLatestSnapshotId());
+    }
+    if (groupname != null) {
+      inode.setGroup(groupname, inodesInPath.getLatestSnapshotId());
+    }
+  }
+
+  static boolean setTimes(
+      FSDirectory fsd, INode inode, long mtime, long atime, boolean force,
+      int latestSnapshotId) throws QuotaExceededException {
+    fsd.writeLock();
+    try {
+      return unprotectedSetTimes(fsd, inode, mtime, atime, force,
+                                 latestSnapshotId);
+    } finally {
+      fsd.writeUnlock();
+    }
+  }
+
+  static boolean unprotectedSetTimes(
+      FSDirectory fsd, String src, long mtime, long atime, boolean force)
+      throws UnresolvedLinkException, QuotaExceededException {
+    assert fsd.hasWriteLock();
+    final INodesInPath i = fsd.getINodesInPath(src, true);
+    return unprotectedSetTimes(fsd, i.getLastINode(), mtime, atime,
+                               force, i.getLatestSnapshotId());
+  }
+
+  /**
+   * See {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)}
+   * for the contract.
+   * Sets quota for for a directory.
+   * @return INodeDirectory if any of the quotas have changed. null otherwise.
+   * @throws FileNotFoundException if the path does not exist.
+   * @throws PathIsNotDirectoryException if the path is not a directory.
+   * @throws QuotaExceededException if the directory tree size is
+   *                                greater than the given quota
+   * @throws UnresolvedLinkException if a symlink is encountered in src.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  static INodeDirectory unprotectedSetQuota(
+      FSDirectory fsd, String src, long nsQuota, long dsQuota)
+      throws FileNotFoundException, PathIsNotDirectoryException,
+      QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
+    assert fsd.hasWriteLock();
+    // sanity check
+    if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET &&
+         nsQuota != HdfsConstants.QUOTA_RESET) ||
+        (dsQuota < 0 && dsQuota != HdfsConstants.QUOTA_DONT_SET &&
+          dsQuota != HdfsConstants.QUOTA_RESET)) {
+      throw new IllegalArgumentException("Illegal value for nsQuota or " +
+                                         "dsQuota : " + nsQuota + " and " +
+                                         dsQuota);
+    }
+
+    String srcs = FSDirectory.normalizePath(src);
+    final INodesInPath iip = fsd.getINodesInPath4Write(srcs, true);
+    INodeDirectory dirNode = INodeDirectory.valueOf(iip.getLastINode(), srcs);
+    if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
+      throw new IllegalArgumentException("Cannot clear namespace quota on root.");
+    } else { // a directory inode
+      final Quota.Counts oldQuota = dirNode.getQuotaCounts();
+      final long oldNsQuota = oldQuota.get(Quota.NAMESPACE);
+      final long oldDsQuota = oldQuota.get(Quota.DISKSPACE);
+      if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
+        nsQuota = oldNsQuota;
+      }
+      if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
+        dsQuota = oldDsQuota;
+      }
+      if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
+        return null;
+      }
+
+      final int latest = iip.getLatestSnapshotId();
+      dirNode.recordModification(latest);
+      dirNode.setQuota(nsQuota, dsQuota);
+      return dirNode;
+    }
+  }
+
+  static Block[] unprotectedSetReplication(
+      FSDirectory fsd, String src, short replication, short[] blockRepls)
+      throws QuotaExceededException, UnresolvedLinkException,
+             SnapshotAccessControlException {
+    assert fsd.hasWriteLock();
+
+    final INodesInPath iip = fsd.getINodesInPath4Write(src, true);
+    final INode inode = iip.getLastINode();
+    if (inode == null || !inode.isFile()) {
+      return null;
+    }
+    INodeFile file = inode.asFile();
+    final short oldBR = file.getBlockReplication();
+
+    // before setFileReplication, check for increasing block replication.
+    // if replication > oldBR, then newBR == replication.
+    // if replication < oldBR, we don't know newBR yet.
+    if (replication > oldBR) {
+      long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
+      fsd.updateCount(iip, 0, dsDelta, true);
+    }
+
+    file.setFileReplication(replication, iip.getLatestSnapshotId());
+
+    final short newBR = file.getBlockReplication();
+    // check newBR < oldBR case.
+    if (newBR < oldBR) {
+      long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
+      fsd.updateCount(iip, 0, dsDelta, true);
+    }
+
+    if (blockRepls != null) {
+      blockRepls[0] = oldBR;
+      blockRepls[1] = newBR;
+    }
+    return file.getBlocks();
+  }
+
+  static void unprotectedSetStoragePolicy(
+      FSDirectory fsd, BlockManager bm, INodesInPath iip, byte policyId)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    final INode inode = iip.getLastINode();
+    if (inode == null) {
+      throw new FileNotFoundException("File/Directory does not exist: "
+          + iip.getPath());
+    }
+    final int snapshotId = iip.getLatestSnapshotId();
+    if (inode.isFile()) {
+      BlockStoragePolicy newPolicy = bm.getStoragePolicy(policyId);
+      if (newPolicy.isCopyOnCreateFile()) {
+        throw new HadoopIllegalArgumentException(
+            "Policy " + newPolicy + " cannot be set after file creation.");
+      }
+
+      BlockStoragePolicy currentPolicy =
+          bm.getStoragePolicy(inode.getLocalStoragePolicyID());
+
+      if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
+        throw new HadoopIllegalArgumentException(
+            "Existing policy " + currentPolicy.getName() +
+                " cannot be changed after file creation.");
+      }
+      inode.asFile().setStoragePolicyID(policyId, snapshotId);
+    } else if (inode.isDirectory()) {
+      setDirStoragePolicy(fsd, inode.asDirectory(), policyId, snapshotId);
+    } else {
+      throw new FileNotFoundException(iip.getPath()
+          + " is not a file or directory");
+    }
+  }
+
+  private static void setDirStoragePolicy(
+      FSDirectory fsd, INodeDirectory inode, byte policyId,
+      int latestSnapshotId) throws IOException {
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
+    List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
+                                                        Arrays.asList(xAttr),
+                                                        EnumSet.of(
+                                                            XAttrSetFlag.CREATE,
+                                                            XAttrSetFlag.REPLACE));
+    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
+  }
+
+  private static boolean unprotectedSetTimes(
+      FSDirectory fsd, INode inode, long mtime, long atime, boolean force,
+      int latest) throws QuotaExceededException {
+    assert fsd.hasWriteLock();
+    boolean status = false;
+    if (mtime != -1) {
+      inode = inode.setModificationTime(mtime, latest);
+      status = true;
+    }
+    if (atime != -1) {
+      long inodeTime = inode.getAccessTime();
+
+      // if the last access time update was within the last precision interval, then
+      // no need to store access time
+      if (atime <= inodeTime + fsd.getFSNamesystem().getAccessTimePrecision()
+          && !force) {
+        status =  false;
+      } else {
+        inode.setAccessTime(atime, latest);
+        status = true;
+      }
+    }
+    return status;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/282b5ed8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 5bc790e..6ca30ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -139,6 +139,11 @@ class FSDirStatAndListingOp {
     return getContentSummaryInt(fsd, iip);
   }
 
+  private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
+    return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
+        parentPolicy;
+  }
+
   /**
    * Get a partial listing of the indicated directory
    *
@@ -196,7 +201,7 @@ class FSDirStatAndListingOp {
             cur.getLocalStoragePolicyID():
             BlockStoragePolicySuite.ID_UNSPECIFIED;
         listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
-            needLocation, fsd.getStoragePolicyID(curPolicy,
+            needLocation, getStoragePolicyID(curPolicy,
                 parentStoragePolicy), snapshot, isRawPath, iip);
         listingCnt++;
         if (needLocation) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/282b5ed8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index ee9bdd0..fbec786 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -17,20 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
-import static org.apache.hadoop.util.Time.now;
-
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,7 +30,6 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
@@ -54,8 +42,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
@@ -68,22 +54,35 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
+import static org.apache.hadoop.util.Time.now;
+
 /**
  * Both FSDirectory and FSNamesystem manage the state of the namespace.
  * FSDirectory is a pure in-memory data structure, all of whose operations
@@ -145,6 +144,12 @@ public class FSDirectory implements Closeable {
   private final boolean aclsEnabled;
   private final boolean xattrsEnabled;
   private final int xattrMaxSize;
+
+  // precision of access times.
+  private final long accessTimePrecision;
+  // whether setStoragePolicy is allowed.
+  private final boolean storagePolicyEnabled;
+
   private final String fsOwnerShortUserName;
   private final String supergroup;
   private final INodeId inodeId;
@@ -222,6 +227,15 @@ public class FSDirectory implements Closeable {
                                 DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_KEY);
     final String unlimited = xattrMaxSize == 0 ? " (unlimited)" : "";
     LOG.info("Maximum size of an xattr: " + xattrMaxSize + unlimited);
+
+    this.accessTimePrecision = conf.getLong(
+        DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
+        DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
+
+    this.storagePolicyEnabled =
+        conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
+                        DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
@@ -287,6 +301,13 @@ public class FSDirectory implements Closeable {
     return xattrsEnabled;
   }
   int getXattrMaxSize() { return xattrMaxSize; }
+  boolean isStoragePolicyEnabled() {
+    return storagePolicyEnabled;
+  }
+  boolean isAccessTimeSupported() {
+    return accessTimePrecision > 0;
+  }
+
 
   int getLsLimit() {
     return lsLimit;
@@ -524,172 +545,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * Set file replication
-   * 
-   * @param src file name
-   * @param replication new replication
-   * @param blockRepls block replications - output parameter
-   * @return array of file blocks
-   * @throws QuotaExceededException
-   * @throws SnapshotAccessControlException 
-   */
-  Block[] setReplication(String src, short replication, short[] blockRepls)
-      throws QuotaExceededException, UnresolvedLinkException,
-      SnapshotAccessControlException {
-    writeLock();
-    try {
-      return unprotectedSetReplication(src, replication, blockRepls);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  Block[] unprotectedSetReplication(String src, short replication,
-      short[] blockRepls) throws QuotaExceededException,
-      UnresolvedLinkException, SnapshotAccessControlException {
-    assert hasWriteLock();
-
-    final INodesInPath iip = getINodesInPath4Write(src, true);
-    final INode inode = iip.getLastINode();
-    if (inode == null || !inode.isFile()) {
-      return null;
-    }
-    INodeFile file = inode.asFile();
-    final short oldBR = file.getBlockReplication();
-
-    // before setFileReplication, check for increasing block replication.
-    // if replication > oldBR, then newBR == replication.
-    // if replication < oldBR, we don't know newBR yet. 
-    if (replication > oldBR) {
-      long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
-      updateCount(iip, 0, dsDelta, true);
-    }
-
-    file.setFileReplication(replication, iip.getLatestSnapshotId());
-    
-    final short newBR = file.getBlockReplication(); 
-    // check newBR < oldBR case. 
-    if (newBR < oldBR) {
-      long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
-      updateCount(iip, 0, dsDelta, true);
-    }
-
-    if (blockRepls != null) {
-      blockRepls[0] = oldBR;
-      blockRepls[1] = newBR;
-    }
-    return file.getBlocks();
-  }
-
-  /** Set block storage policy for a directory */
-  void setStoragePolicy(INodesInPath iip, byte policyId)
-      throws IOException {
-    writeLock();
-    try {
-      unprotectedSetStoragePolicy(iip, policyId);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  void unprotectedSetStoragePolicy(INodesInPath iip, byte policyId)
-      throws IOException {
-    assert hasWriteLock();
-    final INode inode = iip.getLastINode();
-    if (inode == null) {
-      throw new FileNotFoundException("File/Directory does not exist: "
-          + iip.getPath());
-    }
-    final int snapshotId = iip.getLatestSnapshotId();
-    if (inode.isFile()) {
-      BlockStoragePolicy newPolicy = getBlockManager().getStoragePolicy(policyId);
-      if (newPolicy.isCopyOnCreateFile()) {
-        throw new HadoopIllegalArgumentException(
-            "Policy " + newPolicy + " cannot be set after file creation.");
-      }
-
-      BlockStoragePolicy currentPolicy =
-          getBlockManager().getStoragePolicy(inode.getLocalStoragePolicyID());
-
-      if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
-        throw new HadoopIllegalArgumentException(
-            "Existing policy " + currentPolicy.getName() +
-                " cannot be changed after file creation.");
-      }
-      inode.asFile().setStoragePolicyID(policyId, snapshotId);
-    } else if (inode.isDirectory()) {
-      setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);  
-    } else {
-      throw new FileNotFoundException(iip.getPath()
-          + " is not a file or directory");
-    }
-  }
-
-  private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
-      int latestSnapshotId) throws IOException {
-    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
-    XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
-    List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(this, existingXAttrs,
-                                                        Arrays.asList(xAttr),
-                                                        EnumSet.of(
-                                                            XAttrSetFlag.CREATE,
-                                                            XAttrSetFlag.REPLACE));
-    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
-  }
-
-  void setPermission(String src, FsPermission permission)
-      throws FileNotFoundException, UnresolvedLinkException,
-      QuotaExceededException, SnapshotAccessControlException {
-    writeLock();
-    try {
-      unprotectedSetPermission(src, permission);
-    } finally {
-      writeUnlock();
-    }
-  }
-  
-  void unprotectedSetPermission(String src, FsPermission permissions)
-      throws FileNotFoundException, UnresolvedLinkException,
-      QuotaExceededException, SnapshotAccessControlException {
-    assert hasWriteLock();
-    final INodesInPath inodesInPath = getINodesInPath4Write(src, true);
-    final INode inode = inodesInPath.getLastINode();
-    if (inode == null) {
-      throw new FileNotFoundException("File does not exist: " + src);
-    }
-    int snapshotId = inodesInPath.getLatestSnapshotId();
-    inode.setPermission(permissions, snapshotId);
-  }
-
-  void setOwner(String src, String username, String groupname)
-      throws FileNotFoundException, UnresolvedLinkException,
-      QuotaExceededException, SnapshotAccessControlException {
-    writeLock();
-    try {
-      unprotectedSetOwner(src, username, groupname);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  void unprotectedSetOwner(String src, String username, String groupname)
-      throws FileNotFoundException, UnresolvedLinkException,
-      QuotaExceededException, SnapshotAccessControlException {
-    assert hasWriteLock();
-    final INodesInPath inodesInPath = getINodesInPath4Write(src, true);
-    INode inode = inodesInPath.getLastINode();
-    if (inode == null) {
-      throw new FileNotFoundException("File does not exist: " + src);
-    }
-    if (username != null) {
-      inode = inode.setUser(username, inodesInPath.getLatestSnapshotId());
-    }
-    if (groupname != null) {
-      inode.setGroup(groupname, inodesInPath.getLatestSnapshotId());
-    }
-  }
-
-  /**
    * Delete the target directory and collect the blocks under it
    *
    * @param iip the INodesInPath instance containing all the INodes for the path
@@ -841,11 +696,6 @@ public class FSDirectory implements Closeable {
     return removed;
   }
 
-  byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
-    return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
-        parentPolicy;
-  }
-
   /** 
    * Check whether the filepath could be created
    * @throws SnapshotAccessControlException if path is in RO snapshot
@@ -895,7 +745,7 @@ public class FSDirectory implements Closeable {
     }
   }
   
-  private void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
+  void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
       boolean checkQuota) throws QuotaExceededException {
     updateCount(iip, iip.length() - 1, nsDelta, dsDelta, checkQuota);
   }
@@ -1315,77 +1165,7 @@ public class FSDirectory implements Closeable {
   int getInodeMapSize() {
     return inodeMap.size();
   }
-  
-  /**
-   * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
-   * Sets quota for for a directory.
-   * @return INodeDirectory if any of the quotas have changed. null otherwise.
-   * @throws FileNotFoundException if the path does not exist.
-   * @throws PathIsNotDirectoryException if the path is not a directory.
-   * @throws QuotaExceededException if the directory tree size is 
-   *                                greater than the given quota
-   * @throws UnresolvedLinkException if a symlink is encountered in src.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
-      throws FileNotFoundException, PathIsNotDirectoryException,
-      QuotaExceededException, UnresolvedLinkException,
-      SnapshotAccessControlException {
-    assert hasWriteLock();
-    // sanity check
-    if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET && 
-         nsQuota != HdfsConstants.QUOTA_RESET) || 
-        (dsQuota < 0 && dsQuota != HdfsConstants.QUOTA_DONT_SET && 
-          dsQuota != HdfsConstants.QUOTA_RESET)) {
-      throw new IllegalArgumentException("Illegal value for nsQuota or " +
-                                         "dsQuota : " + nsQuota + " and " +
-                                         dsQuota);
-    }
-    
-    String srcs = normalizePath(src);
-    final INodesInPath iip = getINodesInPath4Write(srcs, true);
-    INodeDirectory dirNode = INodeDirectory.valueOf(iip.getLastINode(), srcs);
-    if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
-      throw new IllegalArgumentException("Cannot clear namespace quota on root.");
-    } else { // a directory inode
-      final Quota.Counts oldQuota = dirNode.getQuotaCounts();
-      final long oldNsQuota = oldQuota.get(Quota.NAMESPACE);
-      final long oldDsQuota = oldQuota.get(Quota.DISKSPACE);
-      if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
-        nsQuota = oldNsQuota;
-      }
-      if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
-        dsQuota = oldDsQuota;
-      }        
-      if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
-        return null;
-      }
 
-      final int latest = iip.getLatestSnapshotId();
-      dirNode.recordModification(latest);
-      dirNode.setQuota(nsQuota, dsQuota);
-      return dirNode;
-    }
-  }
-  
-  /**
-   * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
-   * @return INodeDirectory if any of the quotas have changed. null otherwise.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   * @see #unprotectedSetQuota(String, long, long)
-   */
-  INodeDirectory setQuota(String src, long nsQuota, long dsQuota)
-      throws FileNotFoundException, PathIsNotDirectoryException,
-      QuotaExceededException, UnresolvedLinkException,
-      SnapshotAccessControlException {
-    writeLock();
-    try {
-      return unprotectedSetQuota(src, nsQuota, dsQuota);
-    } finally {
-      writeUnlock();
-    }
-  }
-  
   long totalInodes() {
     readLock();
     try {
@@ -1397,50 +1177,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * Sets the access time on the file/directory. Logs it in the transaction log.
-   */
-  boolean setTimes(INode inode, long mtime, long atime, boolean force,
-                   int latestSnapshotId) throws QuotaExceededException {
-    writeLock();
-    try {
-      return unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
-      throws UnresolvedLinkException, QuotaExceededException {
-    assert hasWriteLock();
-    final INodesInPath i = getINodesInPath(src, true);
-    return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
-        i.getLatestSnapshotId());
-  }
-
-  private boolean unprotectedSetTimes(INode inode, long mtime,
-      long atime, boolean force, int latest) throws QuotaExceededException {
-    assert hasWriteLock();
-    boolean status = false;
-    if (mtime != -1) {
-      inode = inode.setModificationTime(mtime, latest);
-      status = true;
-    }
-    if (atime != -1) {
-      long inodeTime = inode.getAccessTime();
-
-      // if the last access time update was within the last precision interval, then
-      // no need to store access time
-      if (atime <= inodeTime + getFSNamesystem().getAccessTimePrecision() && !force) {
-        status =  false;
-      } else {
-        inode.setAccessTime(atime, latest);
-        status = true;
-      }
-    } 
-    return status;
-  }
-
-  /**
    * Reset the entire namespace tree.
    */
   void reset() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/282b5ed8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 13f4eb5..b610cc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -483,9 +483,8 @@ public class FSEditLogLoader {
       SetReplicationOp setReplicationOp = (SetReplicationOp)op;
       short replication = fsNamesys.getBlockManager().adjustReplication(
           setReplicationOp.replication);
-      fsDir.unprotectedSetReplication(
-          renameReservedPathsOnUpgrade(setReplicationOp.path, logVersion),
-                                      replication, null);
+      FSDirAttrOp.unprotectedSetReplication(fsDir, renameReservedPathsOnUpgrade(
+          setReplicationOp.path, logVersion), replication, null);
       break;
     }
     case OP_CONCAT_DELETE: {
@@ -543,45 +542,42 @@ public class FSEditLogLoader {
     }
     case OP_SET_PERMISSIONS: {
       SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
-      fsDir.unprotectedSetPermission(
-          renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion),
-          setPermissionsOp.permissions);
+      FSDirAttrOp.unprotectedSetPermission(fsDir, renameReservedPathsOnUpgrade(
+          setPermissionsOp.src, logVersion), setPermissionsOp.permissions);
       break;
     }
     case OP_SET_OWNER: {
       SetOwnerOp setOwnerOp = (SetOwnerOp)op;
-      fsDir.unprotectedSetOwner(
-          renameReservedPathsOnUpgrade(setOwnerOp.src, logVersion),
+      FSDirAttrOp.unprotectedSetOwner(
+          fsDir, renameReservedPathsOnUpgrade(setOwnerOp.src, logVersion),
           setOwnerOp.username, setOwnerOp.groupname);
       break;
     }
     case OP_SET_NS_QUOTA: {
       SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
-      fsDir.unprotectedSetQuota(
-          renameReservedPathsOnUpgrade(setNSQuotaOp.src, logVersion),
+      FSDirAttrOp.unprotectedSetQuota(
+          fsDir, renameReservedPathsOnUpgrade(setNSQuotaOp.src, logVersion),
           setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET);
       break;
     }
     case OP_CLEAR_NS_QUOTA: {
       ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
-      fsDir.unprotectedSetQuota(
-          renameReservedPathsOnUpgrade(clearNSQuotaOp.src, logVersion),
+      FSDirAttrOp.unprotectedSetQuota(
+          fsDir, renameReservedPathsOnUpgrade(clearNSQuotaOp.src, logVersion),
           HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET);
       break;
     }
 
     case OP_SET_QUOTA:
       SetQuotaOp setQuotaOp = (SetQuotaOp)op;
-      fsDir.unprotectedSetQuota(
-          renameReservedPathsOnUpgrade(setQuotaOp.src, logVersion),
-          setQuotaOp.nsQuota, setQuotaOp.dsQuota);
+      FSDirAttrOp.unprotectedSetQuota(fsDir, renameReservedPathsOnUpgrade(
+          setQuotaOp.src, logVersion), setQuotaOp.nsQuota, setQuotaOp.dsQuota);
       break;
 
     case OP_TIMES: {
       TimesOp timesOp = (TimesOp)op;
-
-      fsDir.unprotectedSetTimes(
-          renameReservedPathsOnUpgrade(timesOp.path, logVersion),
+      FSDirAttrOp.unprotectedSetTimes(
+          fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
           timesOp.mtime, timesOp.atime, true);
       break;
     }
@@ -860,7 +856,9 @@ public class FSEditLogLoader {
       final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
           logVersion);
       final INodesInPath iip = fsDir.getINodesInPath4Write(path);
-      fsDir.unprotectedSetStoragePolicy(iip, setStoragePolicyOp.policyId);
+      FSDirAttrOp.unprotectedSetStoragePolicy(
+          fsDir, fsNamesys.getBlockManager(), iip,
+          setStoragePolicyOp.policyId);
       break;
     }
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/282b5ed8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b432266..e2e0eb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -86,8 +86,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
@@ -416,9 +414,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final CacheManager cacheManager;
   private final DatanodeStatistics datanodeStatistics;
 
-  // whether setStoragePolicy is allowed.
-  private final boolean isStoragePolicyEnabled;
-
   private String nameserviceId;
 
   private RollingUpgradeInfo rollingUpgradeInfo = null;
@@ -742,10 +737,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
       this.blockIdManager = new BlockIdManager(blockManager);
 
-      this.isStoragePolicyEnabled =
-          conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
-                          DFS_STORAGE_POLICY_ENABLED_DEFAULT);
-
       this.fsOwner = UserGroupInformation.getCurrentUser();
       this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
@@ -1664,36 +1655,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws IOException
    */
   void setPermission(String src, FsPermission permission) throws IOException {
-    try {
-      setPermissionInt(src, permission);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "setPermission", src);
-      throw e;
-    }
-  }
-
-  private void setPermissionInt(final String srcArg, FsPermission permission)
-      throws IOException {
-    String src = srcArg;
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+    HdfsFileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set permission for " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      dir.setPermission(src, permission);
-      getEditLog().logSetPermissions(src, permission);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAttrOp.setPermission(dir, src, permission);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setPermission", src);
+      throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setPermission", srcArg, null, resultingStat);
+    logAuditEvent(true, "setPermission", src, null, auditStat);
   }
 
   /**
@@ -1702,44 +1678,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   void setOwner(String src, String username, String group)
       throws IOException {
-    try {
-      setOwnerInt(src, username, group);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "setOwner", src);
-      throw e;
-    } 
-  }
-
-  private void setOwnerInt(final String srcArg, String username, String group)
-      throws IOException {
-    String src = srcArg;
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+    HdfsFileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set owner for " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      dir.checkOwner(pc, iip);
-      if (!pc.isSuperUser()) {
-        if (username != null && !pc.getUser().equals(username)) {
-          throw new AccessControlException("Non-super user cannot change owner");
-        }
-        if (group != null && !pc.containsGroup(group)) {
-          throw new AccessControlException("User does not belong to " + group);
-        }
-      }
-      dir.setOwner(src, username, group);
-      getEditLog().logSetOwner(src, username, group);
-      resultingStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAttrOp.setOwner(dir, src, username, group);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setOwner", src);
+      throw e;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "setOwner", srcArg, null, resultingStat);
+    logAuditEvent(true, "setOwner", src, null, auditStat);
   }
 
   static class GetBlockLocationsResult {
@@ -1784,7 +1737,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         boolean updateAccessTime = now > inode.getAccessTime() +
             getAccessTimePrecision();
         if (!isInSafeMode() && updateAccessTime) {
-          boolean changed = dir.setTimes(
+          boolean changed = FSDirAttrOp.setTimes(dir,
               inode, -1, now, false, res.iip.getLatestSnapshotId());
           if (changed) {
             getEditLog().logTimes(src, -1, now);
@@ -1931,52 +1884,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * The access time is precise up to an hour. The transaction, if needed, is
    * written to the edits log but is not flushed.
    */
-  void setTimes(String src, long mtime, long atime) 
-      throws IOException, UnresolvedLinkException {
-    if (!isAccessTimeSupported() && atime != -1) {
-      throw new IOException("Access time for hdfs is not configured. " +
-                            " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
-    }
-    try {
-      setTimesInt(src, mtime, atime);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "setTimes", src);
-      throw e;
-    }
-  }
-
-  private void setTimesInt(final String srcArg, long mtime, long atime)
-    throws IOException {
-    String src = srcArg;
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+  void setTimes(String src, long mtime, long atime) throws IOException {
+    HdfsFileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set times " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      // Write access is required to set access and modification times
-      if (isPermissionEnabled) {
-        dir.checkPathAccess(pc, iip, FsAction.WRITE);
-      }
-      final INode inode = iip.getLastINode();
-      if (inode != null) {
-        boolean changed = dir.setTimes(inode, mtime, atime, true,
-                iip.getLatestSnapshotId());
-        if (changed) {
-          getEditLog().logTimes(src, mtime, atime);
-        }
-        resultingStat = getAuditFileInfo(src, false);
-      } else {
-        throw new FileNotFoundException("File/Directory " + src + " does not exist.");
-      }
+      auditStat = FSDirAttrOp.setTimes(dir, src, mtime, atime);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setTimes", src);
+      throw e;
     } finally {
       writeUnlock();
     }
-    logAuditEvent(true, "setTimes", srcArg, null, resultingStat);
+    getEditLog().logSync();
+    logAuditEvent(true, "setTimes", src, null, auditStat);
   }
 
   /**
@@ -2060,49 +1983,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   boolean setReplication(final String src, final short replication)
       throws IOException {
-    try {
-      return setReplicationInt(src, replication);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "setReplication", src);
-      throw e;
-    }
-  }
-
-  private boolean setReplicationInt(final String srcArg,
-      final short replication) throws IOException {
-    String src = srcArg;
-    blockManager.verifyReplication(src, replication, null);
-    final boolean isFile;
-    FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    boolean success = false;
     waitForLoadingFSImage();
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set replication for " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      if (isPermissionEnabled) {
-        dir.checkPathAccess(pc, iip, FsAction.WRITE);
-      }
-
-      final short[] blockRepls = new short[2]; // 0: old, 1: new
-      final Block[] blocks = dir.setReplication(src, replication, blockRepls);
-      isFile = blocks != null;
-      if (isFile) {
-        getEditLog().logSetReplication(src, replication);
-        blockManager.setReplication(blockRepls[0], blockRepls[1], src, blocks);
-      }
+      success = FSDirAttrOp.setReplication(dir, blockManager, src, replication);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setReplication", src);
+      throw e;
     } finally {
       writeUnlock();
     }
-
-    getEditLog().logSync();
-    if (isFile) {
-      logAuditEvent(true, "setReplication", srcArg);
+    if (success) {
+      getEditLog().logSync();
+      logAuditEvent(true, "setReplication", src);
     }
-    return isFile;
+    return success;
   }
 
   /**
@@ -2111,58 +2010,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @param src file/directory path
    * @param policyName storage policy name
    */
-  void setStoragePolicy(String src, final String policyName)
-      throws IOException {
-    try {
-      setStoragePolicyInt(src, policyName);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "setStoragePolicy", src);
-      throw e;
-    }
-  }
-
-  private void setStoragePolicyInt(String src, final String policyName)
-      throws IOException {
-    if (!isStoragePolicyEnabled) {
-      throw new IOException("Failed to set storage policy since "
-          + DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
-    }
-    FSPermissionChecker pc = null;
-    if (isPermissionEnabled) {
-      pc = getPermissionChecker();
-    }
-
-    checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+  void setStoragePolicy(String src, String policyName) throws IOException {
+    HdfsFileStatus auditStat;
     waitForLoadingFSImage();
-    HdfsFileStatus fileStat;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set storage policy for " + src);
-
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-
-      if (pc != null) {
-        dir.checkPermission(pc, iip, false, null, null, FsAction.WRITE, null, false);
-      }
-
-      // get the corresponding policy and make sure the policy name is valid
-      BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
-      if (policy == null) {
-        throw new HadoopIllegalArgumentException(
-            "Cannot find a block policy with the name " + policyName);
-      }
-      dir.setStoragePolicy(iip, policy.getId());
-      getEditLog().logSetStoragePolicy(src, policy.getId());
-      fileStat = getAuditFileInfo(src, false);
+      auditStat = FSDirAttrOp.setStoragePolicy(
+          dir, blockManager, src, policyName);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setStoragePolicy", src);
+      throw e;
     } finally {
       writeUnlock();
     }
-
     getEditLog().logSync();
-    logAuditEvent(true, "setStoragePolicy", src, null, fileStat);
+    logAuditEvent(true, "setStoragePolicy", src, null, auditStat);
   }
 
   /**
@@ -2174,26 +2039,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return blockManager.getStoragePolicies();
+      return FSDirAttrOp.getStoragePolicies(blockManager);
     } finally {
       readUnlock();
     }
   }
 
-  long getPreferredBlockSize(String filename) throws IOException {
-    FSPermissionChecker pc = getPermissionChecker();
+  long getPreferredBlockSize(String src) throws IOException {
     checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(filename);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      filename = dir.resolvePath(pc, filename, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath(filename, false);
-      if (isPermissionEnabled) {
-        dir.checkTraverse(pc, iip);
-      }
-      return INodeFile.valueOf(iip.getLastINode(), filename)
-          .getPreferredBlockSize();
+      return FSDirAttrOp.getPreferredBlockSize(dir, src);
     } finally {
       readUnlock();
     }
@@ -3857,20 +3714,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * 
    * Note: This does not support ".inodes" relative path.
    */
-  void setQuota(String path, long nsQuota, long dsQuota)
-      throws IOException, UnresolvedLinkException {
-    checkSuperuserPrivilege();
+  void setQuota(String src, long nsQuota, long dsQuota)
+      throws IOException {
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot set quota on " + path);
-      INodeDirectory changed = dir.setQuota(path, nsQuota, dsQuota);
-      if (changed != null) {
-        final Quota.Counts q = changed.getQuotaCounts();
-        getEditLog().logSetQuota(path,
-                q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
-      }
+      checkNameNodeSafeMode("Cannot set quota on " + src);
+      FSDirAttrOp.setQuota(dir, src, nsQuota, dsQuota);
     } finally {
       writeUnlock();
     }