You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/10/25 02:26:35 UTC

[25/50] [abbrv] hadoop git commit: HDFS-10997. Reduce number of path resolving methods. Contributed by Daryn Sharp.

HDFS-10997. Reduce number of path resolving methods. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d175853
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d175853
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d175853

Branch: refs/heads/YARN-2915
Commit: 9d175853b0170683ad5f21d9bcdeaac49fe89e04
Parents: a1a0281
Author: Kihwal Lee <ki...@apache.org>
Authored: Mon Oct 24 17:14:51 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Mon Oct 24 17:14:51 2016 -0500

----------------------------------------------------------------------
 .../CacheReplicationMonitor.java                |  14 +-
 .../hdfs/server/namenode/CacheManager.java      |   8 +-
 .../server/namenode/EncryptionZoneManager.java  |   3 +-
 .../hadoop/hdfs/server/namenode/FSDirAclOp.java |  28 ++-
 .../hdfs/server/namenode/FSDirAppendOp.java     |   3 +-
 .../hdfs/server/namenode/FSDirAttrOp.java       |  20 +-
 .../hdfs/server/namenode/FSDirConcatOp.java     |   8 +-
 .../hdfs/server/namenode/FSDirDeleteOp.java     |  15 +-
 .../server/namenode/FSDirEncryptionZoneOp.java  |   5 +-
 .../server/namenode/FSDirErasureCodingOp.java   |   5 +-
 .../hdfs/server/namenode/FSDirMkdirOp.java      |  18 +-
 .../hdfs/server/namenode/FSDirRenameOp.java     |  24 +--
 .../hdfs/server/namenode/FSDirSnapshotOp.java   |  17 +-
 .../server/namenode/FSDirStatAndListingOp.java  |  49 ++---
 .../hdfs/server/namenode/FSDirSymlinkOp.java    |   3 +-
 .../hdfs/server/namenode/FSDirTruncateOp.java   |   9 +-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |   3 +-
 .../hdfs/server/namenode/FSDirXAttrOp.java      |  12 +-
 .../hdfs/server/namenode/FSDirectory.java       | 191 +++++++++++--------
 .../hdfs/server/namenode/FSEditLogLoader.java   |  54 +++---
 .../hdfs/server/namenode/FSImageFormat.java     |  17 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  21 +-
 .../server/namenode/FSPermissionChecker.java    | 165 ++++++++++++----
 .../hdfs/server/namenode/INodesInPath.java      |  90 +++------
 .../namenode/snapshot/SnapshotManager.java      |   5 +-
 .../org/apache/hadoop/hdfs/TestFileStatus.java  |   4 +-
 .../hadoop/hdfs/TestReservedRawPaths.java       |   5 +-
 .../hdfs/server/namenode/FSAclBaseTest.java     |   7 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   5 +-
 .../hdfs/server/namenode/TestFSDirectory.java   |  37 ++--
 .../namenode/TestFSPermissionChecker.java       |   5 +-
 .../hdfs/server/namenode/TestFileTruncate.java  |   5 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   |   6 +-
 .../server/namenode/TestGetBlockLocations.java  |   5 +-
 .../server/namenode/TestSnapshotPathINodes.java |   8 +
 .../namenode/snapshot/SnapshotTestHelper.java   |  10 +-
 .../snapshot/TestSnapshotReplication.java       |   3 +-
 .../hadoop/security/TestPermissionSymlinks.java |   7 +-
 38 files changed, 509 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 8563cf3..35e4a2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -56,7 +56,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-;
 
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -334,12 +333,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       String path = directive.getPath();
       INode node;
       try {
-        node = fsDir.getINode(path);
-      } catch (UnresolvedLinkException e) {
-        // We don't cache through symlinks
-        LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
-                + "path {}", directive.getId(), path
-        );
+        node = fsDir.getINode(path, DirOp.READ);
+      } catch (IOException e) {
+        // We don't cache through symlinks or invalid paths
+        LOG.debug("Directive {}: Failed to resolve path {} ({})",
+            directive.getId(), path, e.getMessage());
         continue;
       }
       if (node == null)  {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 24bf751..fa8f011 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -72,6 +71,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -417,9 +417,9 @@ public final class CacheManager {
     long requestedFiles = 0;
     CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
     try {
-      node = fsDir.getINode(path);
-    } catch (UnresolvedLinkException e) {
-      // We don't cache through symlinks
+      node = fsDir.getINode(path, DirOp.READ);
+    } catch (IOException e) {
+      // We don't cache through invalid paths
       return builder.build();
     }
     if (node == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index ceeccf6..d23963d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -370,7 +371,7 @@ public class EncryptionZoneManager {
        contain a reference INode.
       */
       final String pathName = getFullPathName(ezi);
-      INodesInPath iip = dir.getINodesInPath(pathName, false);
+      INodesInPath iip = dir.getINodesInPath(pathName, DirOp.READ_LINK);
       INode lastINode = iip.getLastINode();
       if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
         continue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
index afafd78..25ca09b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -41,7 +42,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       INode inode = FSDirectory.resolveLastINode(iip);
@@ -66,7 +67,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       INode inode = FSDirectory.resolveLastINode(iip);
@@ -90,7 +91,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       INode inode = FSDirectory.resolveLastINode(iip);
@@ -114,7 +115,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       unprotectedRemoveAcl(fsd, iip);
@@ -134,11 +135,10 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
-      src = iip.getPath();
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
-      List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec, false);
-      fsd.getEditLog().logSetAcl(src, newAcl);
+      List<AclEntry> newAcl = unprotectedSetAcl(fsd, iip, aclSpec, false);
+      fsd.getEditLog().logSetAcl(iip.getPath(), newAcl);
     } finally {
       fsd.writeUnlock();
     }
@@ -151,15 +151,12 @@ class FSDirAclOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
-      INodesInPath iip = fsd.resolvePath(pc, src);
+      INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
       // There is no real inode for the path ending in ".snapshot", so return a
       // non-null, unpopulated AclStatus.  This is similar to getFileInfo.
       if (iip.isDotSnapshotDir() && fsd.getINode4DotSnapshot(iip) != null) {
         return new AclStatus.Builder().owner("").group("").build();
       }
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
-      }
       INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getPathSnapshotId();
       List<AclEntry> acl = AclStorage.readINodeAcl(fsd.getAttributes(iip));
@@ -174,12 +171,9 @@ class FSDirAclOp {
     }
   }
 
-  static List<AclEntry> unprotectedSetAcl(
-      FSDirectory fsd, String src, List<AclEntry> aclSpec, boolean fromEdits)
-      throws IOException {
+  static List<AclEntry> unprotectedSetAcl(FSDirectory fsd, INodesInPath iip,
+      List<AclEntry> aclSpec, boolean fromEdits) throws IOException {
     assert fsd.hasWriteLock();
-    final INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), true);
 
     // ACL removal is logged to edits as OP_SET_ACL with an empty list.
     if (aclSpec.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
index 6f898ef..9926ee0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
 import org.apache.hadoop.ipc.RetriableException;
@@ -87,7 +88,7 @@ final class FSDirAppendOp {
     final INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.WRITE);
       // Verify that the destination does not exist as a directory already
       final INode inode = iip.getLastINode();
       final String path = iip.getPath();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 91d9bce..417ce01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -59,7 +60,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
       unprotectedSetPermission(fsd, iip, permission);
     } finally {
@@ -79,7 +80,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
       if (!pc.isSuperUser()) {
         if (username != null && !pc.getUser().equals(username)) {
@@ -105,7 +106,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       // Write access is required to set access and modification times
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
@@ -133,7 +134,7 @@ public class FSDirAttrOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
-      final INodesInPath iip = fsd.resolvePathForWrite(pc, src);
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
       }
@@ -180,7 +181,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
 
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
@@ -204,7 +205,7 @@ public class FSDirAttrOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, path, false);
+      final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ_LINK);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ);
       }
@@ -224,10 +225,7 @@ public class FSDirAttrOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, src, false);
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
-      }
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
       return INodeFile.valueOf(iip.getLastINode(), iip.getPath())
           .getPreferredBlockSize();
     } finally {
@@ -249,7 +247,7 @@ public class FSDirAttrOp {
 
     fsd.writeLock();
     try {
-      INodesInPath iip = fsd.resolvePathForWrite(pc, src);
+      INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       INodeDirectory changed =
           unprotectedSetQuota(fsd, iip, nsQuota, ssQuota, type);
       if (changed != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index 5310b94..40df120 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -54,11 +55,10 @@ class FSDirConcatOp {
     if (FSDirectory.LOG.isDebugEnabled()) {
       FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
     }
-    final INodesInPath targetIIP = fsd.getINodesInPath4Write(target);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
     // write permission for the target
-    FSPermissionChecker pc = null;
     if (fsd.isPermissionEnabled()) {
-      pc = fsd.getPermissionChecker();
       fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
     }
 
@@ -125,7 +125,7 @@ class FSDirConcatOp {
     final INodeDirectory targetParent = targetINode.getParent();
     // now check the srcs
     for(String src : srcs) {
-      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       // permission check for srcs
       if (pc != null) {
         fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index 328ce79..a83a8b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -18,15 +18,18 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode.ReclaimContext;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ChunkedArrayList;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -102,7 +105,7 @@ class FSDirDeleteOp {
       throw new InvalidPathException(src);
     }
 
-    final INodesInPath iip = fsd.resolvePathForWrite(pc, src, false);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
                           FsAction.ALL, true);
@@ -276,10 +279,14 @@ class FSDirDeleteOp {
    * @param iip directory whose descendants are to be checked.
    * @throws AccessControlException if a non-empty protected descendant
    *                                was found.
+   * @throws ParentNotDirectoryException
+   * @throws UnresolvedLinkException
+   * @throws FileNotFoundException
    */
   private static void checkProtectedDescendants(
       FSDirectory fsd, INodesInPath iip)
-          throws AccessControlException, UnresolvedLinkException {
+          throws AccessControlException, UnresolvedLinkException,
+          ParentNotDirectoryException {
     final SortedSet<String> protectedDirs = fsd.getProtectedDirectories();
     if (protectedDirs.isEmpty()) {
       return;
@@ -298,8 +305,8 @@ class FSDirDeleteOp {
     // character after '/'.
     for (String descendant :
             protectedDirs.subSet(src + Path.SEPARATOR, src + "0")) {
-      if (fsd.isNonEmptyDirectory(fsd.getINodesInPath4Write(
-              descendant, false))) {
+      INodesInPath subdirIIP = fsd.getINodesInPath(descendant, DirOp.WRITE);
+      if (fsd.isNonEmptyDirectory(subdirIIP)) {
         throw new AccessControlException(
             "Cannot delete non-empty protected subdirectory " + descendant);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index d7a3611..d5f6be0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.SecurityUtil;
 
 import com.google.common.base.Preconditions;
@@ -157,7 +158,7 @@ final class FSDirEncryptionZoneOp {
     final INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.WRITE);
       final XAttr ezXAttr = fsd.ezManager.createEncryptionZone(iip, suite,
           version, keyName);
       xAttrs.add(ezXAttr);
@@ -183,7 +184,7 @@ final class FSDirEncryptionZoneOp {
     final EncryptionZone ret;
     fsd.readLock();
     try {
-      iip = fsd.resolvePath(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
index 25b3155..1f3b135 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -77,7 +78,7 @@ final class FSDirErasureCodingOp {
     List<XAttr> xAttrs;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src, false);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
       src = iip.getPath();
       xAttrs = createErasureCodingPolicyXAttr(fsn, iip, ecPolicy);
     } finally {
@@ -223,7 +224,7 @@ final class FSDirErasureCodingOp {
       final String srcArg) throws IOException {
     final FSDirectory fsd = fsn.getFSDirectory();
     final FSPermissionChecker pc = fsn.getPermissionChecker();
-    INodesInPath iip = fsd.resolvePath(pc, srcArg);
+    INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
     if (fsn.isPermissionEnabled()) {
       fsn.getFSDirectory().checkPathAccess(pc, iip, FsAction.READ);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 4d8d7d7..6f7c5eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -29,7 +29,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.security.AccessControlException;
 
 import java.io.IOException;
 import java.util.List;
@@ -43,17 +45,10 @@ class FSDirMkdirOp {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException(src);
-    }
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
-      INodesInPath iip = fsd.resolvePathForWrite(pc, src);
-      src = iip.getPath();
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
-      }
+      INodesInPath iip = fsd.resolvePath(pc, src, DirOp.CREATE);
 
       final INode lastINode = iip.getLastINode();
       if (lastINode != null && lastINode.isFile()) {
@@ -159,9 +154,10 @@ class FSDirMkdirOp {
   static void mkdirForEditLog(FSDirectory fsd, long inodeId, String src,
       PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
       throws QuotaExceededException, UnresolvedLinkException, AclException,
-      FileAlreadyExistsException {
+      FileAlreadyExistsException, ParentNotDirectoryException,
+      AccessControlException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath(src, false);
+    INodesInPath iip = fsd.getINodesInPath(src, DirOp.WRITE_LINK);
     final byte[] localName = iip.getLastLocalName();
     final INodesInPath existing = iip.getParentINodesInPath();
     Preconditions.checkState(existing.getLastINode() != null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 12d5cfe..3beb3c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -24,12 +24,12 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -54,15 +54,12 @@ class FSDirRenameOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new IOException("Invalid name: " + dst);
-    }
     FSPermissionChecker pc = fsd.getPermissionChecker();
 
     // Rename does not operate on link targets
     // Do not resolveLink when checking permissions of src and dst
-    INodesInPath srcIIP = fsd.resolvePathForWrite(pc, src, false);
-    INodesInPath dstIIP = fsd.resolvePathForWrite(pc, dst, false);
+    INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
+    INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.CREATE_LINK);
     dstIIP = dstForRenameTo(srcIIP, dstIIP);
     return renameTo(fsd, pc, srcIIP, dstIIP, logRetryCache);
   }
@@ -115,8 +112,8 @@ class FSDirRenameOp {
   @Deprecated
   static INodesInPath renameForEditLog(FSDirectory fsd, String src, String dst,
       long timestamp) throws IOException {
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    final INodesInPath srcIIP = fsd.getINodesInPath(src, DirOp.WRITE_LINK);
+    INodesInPath dstIIP = fsd.getINodesInPath(dst, DirOp.WRITE_LINK);
     // this is wrong but accidentally works.  the edit contains the full path
     // so the following will do nothing, but shouldn't change due to backward
     // compatibility when maybe full path wasn't logged.
@@ -242,9 +239,6 @@ class FSDirRenameOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" +
           " " + src + " to " + dst);
     }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new InvalidPathException("Invalid name: " + dst);
-    }
     final FSPermissionChecker pc = fsd.getPermissionChecker();
 
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
@@ -260,8 +254,8 @@ class FSDirRenameOp {
       String src, String dst, BlocksMapUpdateInfo collectedBlocks,
       boolean logRetryCache,Options.Rename... options)
           throws IOException {
-    final INodesInPath srcIIP = fsd.resolvePathForWrite(pc, src, false);
-    final INodesInPath dstIIP = fsd.resolvePathForWrite(pc, dst, false);
+    final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
+    final INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.CREATE_LINK);
     if (fsd.isPermissionEnabled()) {
       boolean renameToTrash = false;
       if (null != options &&
@@ -330,8 +324,8 @@ class FSDirRenameOp {
       Options.Rename... options)
       throws IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    final INodesInPath srcIIP = fsd.getINodesInPath(src, DirOp.WRITE_LINK);
+    final INodesInPath dstIIP = fsd.getINodesInPath(dst, DirOp.WRITE_LINK);
     unprotectedRenameTo(fsd, srcIIP, dstIIP, timestamp,
         collectedBlocks, options);
     if (!collectedBlocks.getToDeleteList().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
index ad282d1..ff076e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
@@ -84,9 +85,9 @@ class FSDirSnapshotOp {
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       String snapshotName, boolean logRetryCache)
       throws IOException {
-    final INodesInPath iip = fsd.getINodesInPath4Write(snapshotRoot);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       fsd.checkOwner(pc, iip);
     }
 
@@ -114,9 +115,9 @@ class FSDirSnapshotOp {
   static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
-    final INodesInPath iip = fsd.getINodesInPath4Write(path);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       fsd.checkOwner(pc, iip);
     }
     verifySnapshotName(fsd, snapshotNewName, path);
@@ -150,11 +151,11 @@ class FSDirSnapshotOp {
     final FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
+      INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
       if (fsd.isPermissionEnabled()) {
         checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
         checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
       }
-      INodesInPath iip = fsd.getINodesInPath(path, true);
       diffs = snapshotManager.diff(iip, path, fromSnapshot, toSnapshot);
     } finally {
       fsd.readUnlock();
@@ -205,9 +206,9 @@ class FSDirSnapshotOp {
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       String snapshotName, boolean logRetryCache)
       throws IOException {
-    final INodesInPath iip = fsd.getINodesInPath4Write(snapshotRoot);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       fsd.checkOwner(pc, iip);
     }
 
@@ -238,7 +239,7 @@ class FSDirSnapshotOp {
     final String fromPath = snapshot == null ?
         snapshottablePath : Snapshot.getSnapshotPath(snapshottablePath,
         snapshot);
-    INodesInPath iip = fsd.getINodesInPath(fromPath, true);
+    INodesInPath iip = fsd.resolvePath(pc, fromPath, DirOp.READ);
     fsd.checkPermission(pc, iip, false, null, null, FsAction.READ,
         FsAction.READ);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 5aa4dbc..ba7deec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -39,9 +38,11 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -52,14 +53,8 @@ import static org.apache.hadoop.util.Time.now;
 class FSDirStatAndListingOp {
   static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
       byte[] startAfter, boolean needLocation) throws IOException {
-    final INodesInPath iip;
-    if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
-      iip = fsd.resolvePath(pc, srcArg);
-    } else {
-      String src = FSDirectory.resolvePath(srcArg, fsd);
-      iip = fsd.getINodesInPath(src, true);
-    }
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
 
     // Get file name when startAfter is an INodePath.  This is not the
     // common case so avoid any unnecessary processing unless required.
@@ -80,11 +75,8 @@ class FSDirStatAndListingOp {
 
     boolean isSuperUser = true;
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
-      } else {
-        fsd.checkTraverse(pc, iip);
       }
       isSuperUser = pc.isSuperUser();
     }
@@ -104,18 +96,20 @@ class FSDirStatAndListingOp {
   static HdfsFileStatus getFileInfo(
       FSDirectory fsd, String srcArg, boolean resolveLink)
       throws IOException {
-    String src = srcArg;
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException("Invalid file name: " + src);
-    }
+    DirOp dirOp = resolveLink ? DirOp.READ : DirOp.READ_LINK;
+    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip;
-    if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
-      iip = fsd.resolvePath(pc, srcArg, resolveLink);
-      fsd.checkPermission(pc, iip, false, null, null, null, null, false);
+    if (pc.isSuperUser()) {
+      // superuser can only get an ACE if an existing ancestor is a file.
+      // right or (almost certainly) wrong, current fs contracts expect
+      // superuser to receive null instead.
+      try {
+        iip = fsd.resolvePath(pc, srcArg, dirOp);
+      } catch (AccessControlException ace) {
+        return null;
+      }
     } else {
-      src = FSDirectory.resolvePath(srcArg, fsd);
-      iip = fsd.getINodesInPath(src, resolveLink);
+      iip = fsd.resolvePath(pc, srcArg, dirOp);
     }
     return getFileInfo(fsd, iip);
   }
@@ -125,17 +119,14 @@ class FSDirStatAndListingOp {
    */
   static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    final INodesInPath iip = fsd.resolvePath(pc, src);
-    if (fsd.isPermissionEnabled()) {
-      fsd.checkTraverse(pc, iip);
-    }
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
   }
 
   static ContentSummary getContentSummary(
       FSDirectory fsd, String src) throws IOException {
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    final INodesInPath iip = fsd.resolvePath(pc, src, false);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, null, null,
           FsAction.READ_EXECUTE);
@@ -158,7 +149,7 @@ class FSDirStatAndListingOp {
     BlockManager bm = fsd.getBlockManager();
     fsd.readLock();
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, src);
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
       src = iip.getPath();
       final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       if (fsd.isPermissionEnabled()) {
@@ -537,7 +528,7 @@ class FSDirStatAndListingOp {
     final INodesInPath iip;
     fsd.readLock();
     try {
-      iip = fsd.resolvePath(pc, src, false);
+      iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPermission(pc, iip, false, null, null, null,
             FsAction.READ_EXECUTE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
index 71362f8..3b5f19d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 
 import java.io.IOException;
 
@@ -55,7 +56,7 @@ class FSDirSymlinkOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, link, false);
+      iip = fsd.resolvePath(pc, link, DirOp.WRITE_LINK);
       link = iip.getPath();
       if (!createParent) {
         fsd.verifyParentDir(iip);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index 19518b4..f2a1ee5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
@@ -77,7 +78,7 @@ final class FSDirTruncateOp {
     Block truncateBlock = null;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.WRITE);
       src = iip.getPath();
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
@@ -154,7 +155,7 @@ final class FSDirTruncateOp {
    * {@link FSDirTruncateOp#truncate}, this will not schedule block recovery.
    *
    * @param fsn namespace
-   * @param src path name
+   * @param iip path name
    * @param clientName client name
    * @param clientMachine client machine info
    * @param newLength the target file size
@@ -162,7 +163,8 @@ final class FSDirTruncateOp {
    * @param truncateBlock truncate block
    * @throws IOException
    */
-  static void unprotectedTruncate(final FSNamesystem fsn, final String src,
+  static void unprotectedTruncate(final FSNamesystem fsn,
+      final INodesInPath iip,
       final String clientName, final String clientMachine,
       final long newLength, final long mtime, final Block truncateBlock)
       throws UnresolvedLinkException, QuotaExceededException,
@@ -170,7 +172,6 @@ final class FSDirTruncateOp {
     assert fsn.hasWriteLock();
 
     FSDirectory fsd = fsn.getFSDirectory();
-    INodesInPath iip = fsd.getINodesInPath(src, true);
     INodeFile file = iip.getLastINode().asFile();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     boolean onBlockBoundary = unprotectedTruncate(fsn, iip, newLength,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index aab0f76..6467e09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -305,7 +306,7 @@ class FSDirWriteFileOp {
   static INodesInPath resolvePathForStartFile(FSDirectory dir,
       FSPermissionChecker pc, String src, EnumSet<CreateFlag> flag,
       boolean createParent) throws IOException {
-    INodesInPath iip = dir.resolvePathForWrite(pc, src);
+    INodesInPath iip = dir.resolvePath(pc, src, DirOp.CREATE);
     if (dir.isPermissionEnabled()) {
       dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 6badf24..f676f36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
@@ -72,7 +73,7 @@ class FSDirXAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       checkXAttrChangeAccess(fsd, iip, xAttr, pc);
       unprotectedSetXAttrs(fsd, iip, xAttrs, flag);
@@ -94,7 +95,7 @@ class FSDirXAttrOp {
     if (!getAll) {
       XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs, isRawPath);
     }
-    final INodesInPath iip = fsd.resolvePath(pc, src);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPathAccess(pc, iip, FsAction.READ);
     }
@@ -133,7 +134,7 @@ class FSDirXAttrOp {
     FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
     final FSPermissionChecker pc = fsd.getPermissionChecker();
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
-    final INodesInPath iip = fsd.resolvePath(pc, src);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     if (fsd.isPermissionEnabled()) {
       /* To access xattr names, you need EXECUTE in the owning directory. */
       fsd.checkParentAccess(pc, iip, FsAction.EXECUTE);
@@ -165,7 +166,7 @@ class FSDirXAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       checkXAttrChangeAccess(fsd, iip, xAttr, pc);
 
@@ -186,8 +187,7 @@ class FSDirXAttrOp {
       FSDirectory fsd, final String src, final List<XAttr> toRemove)
       throws IOException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), true);
+    INodesInPath iip = fsd.getINodesInPath(src, DirOp.WRITE);
     INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index a059ee5..b21442d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -240,6 +242,17 @@ public class FSDirectory implements Closeable {
    */
   private final NameCache<ByteArray> nameCache;
 
+  // used to specify path resolution type. *_LINK will return symlinks instead
+  // of throwing an unresolved exception
+  public enum DirOp {
+    READ,
+    READ_LINK,
+    WRITE,  // disallows snapshot paths.
+    WRITE_LINK,
+    CREATE, // like write, but also blocks invalid path names.
+    CREATE_LINK;
+  };
+
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     this.inodeId = new INodeId();
@@ -540,65 +553,73 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * This is a wrapper for resolvePath(). If the path passed
-   * is prefixed with /.reserved/raw, then it checks to ensure that the caller
-   * has super user privileges.
+   * Resolves a given path into an INodesInPath.  All ancestor inodes that
+   * exist are validated as traversable directories.  Symlinks in the ancestry
+   * will generate an UnresolvedLinkException.  The returned IIP will be an
+   * accessible path that also passed additional sanity checks based on how
+   * the path will be used as specified by the DirOp.
+   *   READ:   Expands reserved paths and performs permission checks
+   *           during traversal.  Raw paths are only accessible by a superuser.
+   *   WRITE:  In addition to READ checks, ensures the path is not a
+   *           snapshot path.
+   *   CREATE: In addition to WRITE checks, ensures path does not contain
+   *           illegal character sequences.
    *
-   * @param pc The permission checker used when resolving path.
-   * @param path The path to resolve.
+   * @param pc  A permission checker for traversal checks.  Pass null for
+   *            no permission checks.
+   * @param src The path to resolve.
+   * @param dirOp The {@link DirOp} that controls additional checks.
+   * @param resolveLink If false, only ancestor symlinks will be checked.  If
+   *         true, the last inode will also be checked.
    * @return if the path indicates an inode, return path after replacing up to
    *         <inodeid> with the corresponding path of the inode, else the path
    *         in {@code src} as is. If the path refers to a path in the "raw"
    *         directory, return the non-raw pathname.
    * @throws FileNotFoundException
    * @throws AccessControlException
+   * @throws ParentNotDirectoryException
+   * @throws UnresolvedLinkException
    */
   @VisibleForTesting
-  public INodesInPath resolvePath(FSPermissionChecker pc, String src)
-      throws UnresolvedLinkException, FileNotFoundException,
-      AccessControlException {
-    return resolvePath(pc, src, true);
-  }
-
-  @VisibleForTesting
   public INodesInPath resolvePath(FSPermissionChecker pc, String src,
-      boolean resolveLink) throws UnresolvedLinkException,
-  FileNotFoundException, AccessControlException {
+      DirOp dirOp) throws UnresolvedLinkException, FileNotFoundException,
+      AccessControlException, ParentNotDirectoryException {
+    boolean isCreate = (dirOp == DirOp.CREATE || dirOp == DirOp.CREATE_LINK);
+    // prevent creation of new invalid paths
+    if (isCreate && !DFSUtil.isValidName(src)) {
+      throw new InvalidPathException("Invalid file name: " + src);
+    }
+
     byte[][] components = INode.getPathComponents(src);
     boolean isRaw = isReservedRawName(components);
     if (isPermissionEnabled && pc != null && isRaw) {
       pc.checkSuperuserPrivilege();
     }
     components = resolveComponents(components, this);
-    return INodesInPath.resolve(rootDir, components, isRaw, resolveLink);
-  }
-
-  INodesInPath resolvePathForWrite(FSPermissionChecker pc, String src)
-      throws UnresolvedLinkException, FileNotFoundException,
-      AccessControlException {
-    return resolvePathForWrite(pc, src, true);
-  }
-
-  INodesInPath resolvePathForWrite(FSPermissionChecker pc, String src,
-      boolean resolveLink) throws UnresolvedLinkException,
-  FileNotFoundException, AccessControlException {
-    INodesInPath iip = resolvePath(pc, src, resolveLink);
-    if (iip.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-          "Modification on a read-only snapshot is disallowed");
+    INodesInPath iip = INodesInPath.resolve(rootDir, components, isRaw);
+    // verify all ancestors are dirs and traversable.  note that only
+    // methods that create new namespace items have the signature to throw
+    // PNDE
+    try {
+      checkTraverse(pc, iip, dirOp);
+    } catch (ParentNotDirectoryException pnde) {
+      if (!isCreate) {
+        throw new AccessControlException(pnde.getMessage());
+      }
+      throw pnde;
     }
     return iip;
   }
 
   INodesInPath resolvePath(FSPermissionChecker pc, String src, long fileId)
       throws UnresolvedLinkException, FileNotFoundException,
-      AccessControlException {
+      AccessControlException, ParentNotDirectoryException {
     // Older clients may not have given us an inode ID to work with.
     // In this case, we have to try to resolve the path and hope it
     // hasn't changed or been deleted since the file was opened for write.
     INodesInPath iip;
     if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
-      iip = resolvePath(pc, src);
+      iip = resolvePath(pc, src, DirOp.WRITE);
     } else {
       INode inode = getInode(fileId);
       if (inode == null) {
@@ -1607,63 +1628,57 @@ public class FSDirectory implements Closeable {
     return null;
   }
 
-  INodesInPath getExistingPathINodes(byte[][] components)
-      throws UnresolvedLinkException {
-    return INodesInPath.resolve(rootDir, components, false);
-  }
-
   /**
-   * Get {@link INode} associated with the file / directory.
+   * Resolves the given path into inodes.  Reserved paths are not handled and
+   * permissions are not verified.  Client supplied paths should be
+   * resolved via {@link #resolvePath(FSPermissionChecker, String, DirOp)}.
+   * This method should only be used by internal methods.
+   * @return the {@link INodesInPath} containing all inodes in the path.
+   * @throws UnresolvedLinkException
+   * @throws ParentNotDirectoryException
+   * @throws AccessControlException
    */
-  public INodesInPath getINodesInPath4Write(String src)
-      throws UnresolvedLinkException, SnapshotAccessControlException {
-    return getINodesInPath4Write(src, true);
+  public INodesInPath getINodesInPath(String src, DirOp dirOp)
+      throws UnresolvedLinkException, AccessControlException,
+      ParentNotDirectoryException {
+    return getINodesInPath(INode.getPathComponents(src), dirOp);
+  }
+
+  public INodesInPath getINodesInPath(byte[][] components, DirOp dirOp)
+      throws UnresolvedLinkException, AccessControlException,
+      ParentNotDirectoryException {
+    INodesInPath iip = INodesInPath.resolve(rootDir, components);
+    checkTraverse(null, iip, dirOp);
+    return iip;
   }
 
   /**
    * Get {@link INode} associated with the file / directory.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * See {@link #getINode(String, DirOp)}
    */
-  public INode getINode4Write(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
-    return getINodesInPath4Write(src, true).getLastINode();
-  }
-
-  /** @return the {@link INodesInPath} containing all inodes in the path. */
-  public INodesInPath getINodesInPath(String path, boolean resolveLink)
-      throws UnresolvedLinkException {
-    final byte[][] components = INode.getPathComponents(path);
-    return INodesInPath.resolve(rootDir, components, resolveLink);
-  }
-
-  /** @return the last inode in the path. */
-  INode getINode(String path, boolean resolveLink)
-      throws UnresolvedLinkException {
-    return getINodesInPath(path, resolveLink).getLastINode();
+  @VisibleForTesting // should be removed after a lot of tests are updated
+  public INode getINode(String src) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException {
+    return getINode(src, DirOp.READ);
   }
 
   /**
    * Get {@link INode} associated with the file / directory.
+   * See {@link #getINode(String, DirOp)}
    */
-  public INode getINode(String src) throws UnresolvedLinkException {
-    return getINode(src, true);
+  @VisibleForTesting // should be removed after a lot of tests are updated
+  public INode getINode4Write(String src) throws UnresolvedLinkException,
+      AccessControlException, FileNotFoundException,
+      ParentNotDirectoryException {
+    return getINode(src, DirOp.WRITE);
   }
 
   /**
-   * @return the INodesInPath of the components in src
-   * @throws UnresolvedLinkException if symlink can't be resolved
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * Get {@link INode} associated with the file / directory.
    */
-  INodesInPath getINodesInPath4Write(String src, boolean resolveLink)
-          throws UnresolvedLinkException, SnapshotAccessControlException {
-    final byte[][] components = INode.getPathComponents(src);
-    INodesInPath inodesInPath = INodesInPath.resolve(rootDir, components,
-        resolveLink);
-    if (inodesInPath.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-              "Modification on a read-only snapshot is disallowed");
-    }
-    return inodesInPath;
+  public INode getINode(String src, DirOp dirOp) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException {
+    return getINodesInPath(src, dirOp).getLastINode();
   }
 
   FSPermissionChecker getPermissionChecker()
@@ -1706,9 +1721,33 @@ public class FSDirectory implements Closeable {
     checkPermission(pc, iip, false, access, null, null, null);
   }
 
-  void checkTraverse(FSPermissionChecker pc, INodesInPath iip)
-      throws AccessControlException {
-    checkPermission(pc, iip, false, null, null, null, null);
+  void checkTraverse(FSPermissionChecker pc, INodesInPath iip,
+      boolean resolveLink) throws AccessControlException,
+        UnresolvedPathException, ParentNotDirectoryException {
+    FSPermissionChecker.checkTraverse(
+        isPermissionEnabled ? pc : null, iip, resolveLink);
+  }
+
+  void checkTraverse(FSPermissionChecker pc, INodesInPath iip,
+      DirOp dirOp) throws AccessControlException, UnresolvedPathException,
+          ParentNotDirectoryException {
+    final boolean resolveLink;
+    switch (dirOp) {
+      case READ_LINK:
+      case WRITE_LINK:
+      case CREATE_LINK:
+        resolveLink = false;
+        break;
+      default:
+        resolveLink = true;
+        break;
+    }
+    checkTraverse(pc, iip, resolveLink);
+    boolean allowSnapshot = (dirOp == DirOp.READ || dirOp == DirOp.READ_LINK);
+    if (!allowSnapshot && iip.isSnapshot()) {
+      throw new SnapshotAccessControlException(
+          "Modification on a read-only snapshot is disallowed");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 8abdba8..9f8687b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
@@ -352,7 +353,7 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append (old append)
 
       // See if the file already exists (persistBlocks call)
-      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
@@ -430,7 +431,7 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
       }
 
-      final INodesInPath iip = fsDir.getINodesInPath(path, true);
+      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
       final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
 
       // Update the salient file attributes.
@@ -468,7 +469,7 @@ public class FSEditLogLoader {
             " clientMachine " + appendOp.clientMachine +
             " newBlock " + appendOp.newBlock);
       }
-      INodesInPath iip = fsDir.getINodesInPath4Write(path);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
       INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
       if (!file.isUnderConstruction()) {
         LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
@@ -492,7 +493,7 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " numblocks : " + updateOp.blocks.length);
       }
-      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // Update in-memory data structures
       ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
@@ -511,7 +512,7 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " new block id : " + addBlockOp.getLastBlock().getBlockId());
       }
-      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // add the new block to the INodeFile
       ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
@@ -523,7 +524,7 @@ public class FSEditLogLoader {
       SetReplicationOp setReplicationOp = (SetReplicationOp)op;
       String src = renameReservedPathsOnUpgrade(
           setReplicationOp.path, logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       short replication = fsNamesys.getBlockManager().adjustReplication(
           setReplicationOp.replication);
       FSDirAttrOp.unprotectedSetReplication(fsDir, iip, replication);
@@ -537,10 +538,10 @@ public class FSEditLogLoader {
         srcs[i] =
             renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
       }
-      INodesInPath targetIIP = fsDir.getINodesInPath4Write(trg);
+      INodesInPath targetIIP = fsDir.getINodesInPath(trg, DirOp.WRITE);
       INodeFile[] srcFiles = new INodeFile[srcs.length];
       for (int i = 0; i < srcs.length; i++) {
-        INodesInPath srcIIP = fsDir.getINodesInPath4Write(srcs[i]);
+        INodesInPath srcIIP = fsDir.getINodesInPath(srcs[i], DirOp.WRITE);
         srcFiles[i] = srcIIP.getLastINode().asFile();
       }
       FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
@@ -567,7 +568,7 @@ public class FSEditLogLoader {
       DeleteOp deleteOp = (DeleteOp)op;
       final String src = renameReservedPathsOnUpgrade(
           deleteOp.path, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src, false);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK);
       FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp);
 
       if (toAddRetryCache) {
@@ -594,7 +595,7 @@ public class FSEditLogLoader {
       SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
       final String src =
           renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetPermission(fsDir, iip,
           setPermissionsOp.permissions);
       break;
@@ -603,7 +604,7 @@ public class FSEditLogLoader {
       SetOwnerOp setOwnerOp = (SetOwnerOp)op;
       final String src = renameReservedPathsOnUpgrade(
           setOwnerOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetOwner(fsDir, iip,
           setOwnerOp.username, setOwnerOp.groupname);
       break;
@@ -612,7 +613,7 @@ public class FSEditLogLoader {
       SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
       final String src = renameReservedPathsOnUpgrade(
           setNSQuotaOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET, null);
       break;
@@ -621,7 +622,7 @@ public class FSEditLogLoader {
       ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
       final String src = renameReservedPathsOnUpgrade(
           clearNSQuotaOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET, null);
       break;
@@ -630,7 +631,7 @@ public class FSEditLogLoader {
       SetQuotaOp setQuotaOp = (SetQuotaOp) op;
       final String src = renameReservedPathsOnUpgrade(
           setQuotaOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           setQuotaOp.nsQuota, setQuotaOp.dsQuota, null);
       break;
@@ -640,7 +641,7 @@ public class FSEditLogLoader {
           (FSEditLogOp.SetQuotaByStorageTypeOp) op;
       final String src = renameReservedPathsOnUpgrade(
           setQuotaByStorageTypeOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           HdfsConstants.QUOTA_DONT_SET, setQuotaByStorageTypeOp.dsQuota,
           setQuotaByStorageTypeOp.type);
@@ -650,7 +651,7 @@ public class FSEditLogLoader {
       TimesOp timesOp = (TimesOp)op;
       final String src = renameReservedPathsOnUpgrade(
           timesOp.path, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetTimes(fsDir, iip,
           timesOp.mtime, timesOp.atime, true);
       break;
@@ -664,7 +665,7 @@ public class FSEditLogLoader {
           lastInodeId);
       final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
           logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath(path, false);
+      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE_LINK);
       FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(),
           iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime,
           symlinkOp.atime, symlinkOp.permissionStatus);
@@ -724,7 +725,7 @@ public class FSEditLogLoader {
           reassignLeaseOp.leaseHolder);
       final String path =
           renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
-      INodeFile pendingFile = fsDir.getINode(path).asFile();
+      INodeFile pendingFile = fsDir.getINode(path, DirOp.READ).asFile();
       Preconditions.checkState(pendingFile.isUnderConstruction());
       fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.newHolder,
               pendingFile);
@@ -740,7 +741,7 @@ public class FSEditLogLoader {
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
               logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
       String path = fsNamesys.getSnapshotManager().createSnapshot(iip,
           snapshotRoot, createSnapshotOp.snapshotName);
       if (toAddRetryCache) {
@@ -756,7 +757,7 @@ public class FSEditLogLoader {
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot,
               logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
       fsNamesys.getSnapshotManager().deleteSnapshot(iip,
           deleteSnapshotOp.snapshotName,
           new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
@@ -778,7 +779,7 @@ public class FSEditLogLoader {
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot,
               logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
       fsNamesys.getSnapshotManager().renameSnapshot(iip,
           snapshotRoot, renameSnapshotOp.snapshotOldName,
           renameSnapshotOp.snapshotNewName);
@@ -907,13 +908,13 @@ public class FSEditLogLoader {
     }
     case OP_SET_ACL: {
       SetAclOp setAclOp = (SetAclOp) op;
-      FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries,
-          true);
+      INodesInPath iip = fsDir.getINodesInPath(setAclOp.src, DirOp.WRITE);
+      FSDirAclOp.unprotectedSetAcl(fsDir, iip, setAclOp.aclEntries, true);
       break;
     }
     case OP_SET_XATTR: {
       SetXAttrOp setXAttrOp = (SetXAttrOp) op;
-      INodesInPath iip = fsDir.getINodesInPath4Write(setXAttrOp.src);
+      INodesInPath iip = fsDir.getINodesInPath(setXAttrOp.src, DirOp.WRITE);
       FSDirXAttrOp.unprotectedSetXAttrs(fsDir, iip,
                                         setXAttrOp.xAttrs,
                                         EnumSet.of(XAttrSetFlag.CREATE,
@@ -935,7 +936,8 @@ public class FSEditLogLoader {
     }
     case OP_TRUNCATE: {
       TruncateOp truncateOp = (TruncateOp) op;
-      FSDirTruncateOp.unprotectedTruncate(fsNamesys, truncateOp.src,
+      INodesInPath iip = fsDir.getINodesInPath(truncateOp.src, DirOp.WRITE);
+      FSDirTruncateOp.unprotectedTruncate(fsNamesys, iip,
           truncateOp.clientName, truncateOp.clientMachine,
           truncateOp.newLength, truncateOp.timestamp, truncateOp.truncateBlock);
       break;
@@ -944,7 +946,7 @@ public class FSEditLogLoader {
       SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
       final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
           logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(path);
+      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetStoragePolicy(
           fsDir, fsNamesys.getBlockManager(), iip,
           setStoragePolicyOp.policyId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index e4263bd..ffa6bca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -24,7 +24,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.security.DigestInputStream;
@@ -44,8 +43,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotDirectoryException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -59,6 +56,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -600,7 +598,7 @@ public class FSImageFormat {
      // Rename .snapshot paths if we're doing an upgrade
      parentPath = renameReservedPathsOnUpgrade(parentPath, getLayoutVersion());
      final INodeDirectory parent = INodeDirectory.valueOf(
-         namesystem.dir.getINode(parentPath, true), parentPath);
+         namesystem.dir.getINode(parentPath, DirOp.READ), parentPath);
      return loadChildren(parent, in, counter);
    }
 
@@ -651,15 +649,14 @@ public class FSImageFormat {
     }
   }
 
-  private INodeDirectory getParentINodeDirectory(byte[][] pathComponents
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
+  private INodeDirectory getParentINodeDirectory(byte[][] pathComponents)
+      throws IOException {
     if (pathComponents.length < 2) { // root
       return null;
     }
     // Gets the parent INode
-    final INodesInPath inodes = namesystem.dir.getExistingPathINodes(
-        pathComponents);
+    final INodesInPath inodes =
+        namesystem.dir.getINodesInPath(pathComponents, DirOp.WRITE);
     return INodeDirectory.valueOf(inodes.getINode(-2), pathComponents);
   }
 
@@ -954,7 +951,7 @@ public class FSImageFormat {
           inSnapshot = true;
         } else {
           path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
-          final INodesInPath iip = fsDir.getINodesInPath(path, true);
+          final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
           oldnode = INodeFile.valueOf(iip.getLastINode(), path);
         }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d175853/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index eb870f8..88c7681 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -219,6 +219,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSDirEncryptionZoneOp.EncryptionKeyInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
@@ -1796,7 +1797,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
          * HDFS-7463. A better fix is to change the edit log of SetTime to
          * use inode id instead of a path.
          */
-        final INodesInPath iip = dir.resolvePath(pc, srcArg);
+        final INodesInPath iip = dir.resolvePath(pc, srcArg, DirOp.READ);
         src = iip.getPath();
 
         INode inode = iip.getLastINode();
@@ -2270,10 +2271,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-  
     boolean skipSync = false;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -2281,7 +2278,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot recover the lease of " + src);
-      final INodesInPath iip = dir.resolvePathForWrite(pc, src);
+      final INodesInPath iip = dir.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       if (!inode.isUnderConstruction()) {
@@ -3283,12 +3280,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String fullName = bc.getName();
     try {
       if (fullName != null && fullName.startsWith(Path.SEPARATOR)
-          && dir.getINode(fullName) == bc) {
+          && dir.getINode(fullName, DirOp.READ) == bc) {
         // If file exists in normal path then no need to look in snapshot
         return false;
       }
-    } catch (UnresolvedLinkException e) {
-      LOG.error("Error while resolving the link : " + fullName, e);
+    } catch (IOException e) {
+      // the snapshot path and current path may contain symlinks, ancestor
+      // dirs replaced by files, etc.
+      LOG.error("Error while resolving the path : " + fullName, e);
       return false;
     }
     /*
@@ -5698,7 +5697,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     List<DirectorySnapshottableFeature> lsf = new ArrayList<>();
     if (snapshottableDirs != null) {
       for (String snap : snapshottableDirs) {
-        final INode isnap = getFSDirectory().getINode(snap, false);
+        final INode isnap = getFSDirectory().getINode(snap, DirOp.READ_LINK);
         final DirectorySnapshottableFeature sf =
             isnap.asDirectory().getDirectorySnapshottableFeature();
         if (sf == null) {
@@ -6791,7 +6790,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      final INodesInPath iip = dir.resolvePath(pc, src);
+      final INodesInPath iip = dir.resolvePath(pc, src, DirOp.READ);
       src = iip.getPath();
       INode inode = iip.getLastINode();
       if (inode == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org