You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/18 20:25:21 UTC

[43/50] [abbrv] hadoop git commit: HDFS-8446. Separate safemode related operations in GetBlockLocations(). Contributed by Haohui Mai.

HDFS-8446. Separate safemode related operations in GetBlockLocations(). Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/563aa169
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/563aa169
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/563aa169

Branch: refs/heads/YARN-2928
Commit: 563aa1695fb08a4a42482733dc323221c4dab583
Parents: c033209
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Jun 17 16:21:37 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu Jun 18 11:19:01 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirAttrOp.java       |   3 +-
 .../server/namenode/FSDirStatAndListingOp.java  |  93 +++++++++--
 .../hdfs/server/namenode/FSDirectory.java       |  20 +++
 .../hdfs/server/namenode/FSNamesystem.java      | 153 +++----------------
 .../hdfs/server/namenode/NamenodeFsck.java      |   5 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   |  16 +-
 7 files changed, 140 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6ef405b..6dfcd18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -647,6 +647,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8238. Move ClientProtocol to the hdfs-client.
     (Takanobu Asanuma via wheat9)
 
+    HDFS-8446. Separate safemode related operations in GetBlockLocations().
+    (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 3b07320..b322b69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -474,8 +474,7 @@ public class FSDirAttrOp {
 
       // if the last access time update was within the last precision interval, then
       // no need to store access time
-      if (atime <= inodeTime + fsd.getFSNamesystem().getAccessTimePrecision()
-          && !force) {
+      if (atime <= inodeTime + fsd.getAccessTimePrecision() && !force) {
         status =  false;
       } else {
         inode.setAccessTime(atime, latest);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index c636d93..201dabc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -43,6 +45,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 
+import static org.apache.hadoop.util.Time.now;
+
 class FSDirStatAndListingOp {
   static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
       byte[] startAfter, boolean needLocation) throws IOException {
@@ -137,9 +141,69 @@ class FSDirStatAndListingOp {
     return getContentSummaryInt(fsd, iip);
   }
 
+  /**
+   * Get block locations within the specified range.
+   * @see ClientProtocol#getBlockLocations(String, long, long)
+   * @throws IOException
+   */
+  static GetBlockLocationsResult getBlockLocations(
+      FSDirectory fsd, FSPermissionChecker pc, String src, long offset,
+      long length, boolean needBlockToken) throws IOException {
+    Preconditions.checkArgument(offset >= 0,
+        "Negative offset is not supported. File: " + src);
+    Preconditions.checkArgument(length >= 0,
+        "Negative length is not supported. File: " + src);
+    CacheManager cm = fsd.getFSNamesystem().getCacheManager();
+    BlockManager bm = fsd.getBlockManager();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    boolean isReservedName = FSDirectory.isReservedRawName(src);
+    fsd.readLock();
+    try {
+      src = fsd.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath(src, true);
+      final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, iip, FsAction.READ);
+        fsd.checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
+      }
+
+      final long fileSize = iip.isSnapshot()
+          ? inode.computeFileSize(iip.getPathSnapshotId())
+          : inode.computeFileSizeNotIncludingLastUcBlock();
+
+      boolean isUc = inode.isUnderConstruction();
+      if (iip.isSnapshot()) {
+        // if src indicates a snapshot file, we need to make sure the returned
+        // blocks do not exceed the size of the snapshot file.
+        length = Math.min(length, fileSize - offset);
+        isUc = false;
+      }
+
+      final FileEncryptionInfo feInfo = isReservedName ? null
+          : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
+
+      final LocatedBlocks blocks = bm.createLocatedBlocks(
+          inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
+          length, needBlockToken, iip.isSnapshot(), feInfo);
+
+      // Set caching information for the located blocks.
+      for (LocatedBlock lb : blocks.getLocatedBlocks()) {
+        cm.setCachedLocations(lb);
+      }
+
+      final long now = now();
+      boolean updateAccessTime = fsd.isAccessTimeSupported()
+          && !iip.isSnapshot()
+          && now > inode.getAccessTime() + fsd.getAccessTimePrecision();
+      return new GetBlockLocationsResult(updateAccessTime, blocks);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
   private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
-    return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED ? inodePolicy :
-        parentPolicy;
+    return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
+        ? inodePolicy : parentPolicy;
   }
 
   /**
@@ -294,13 +358,11 @@ class FSDirStatAndListingOp {
       byte policyId = includeStoragePolicy && !i.isSymlink() ?
           i.getStoragePolicyID() :
           HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-      INodeAttributes nodeAttrs = getINodeAttributes(
-          fsd, path, HdfsFileStatus.EMPTY_NAME, i, src.getPathSnapshotId());
-      return createFileStatus(
-          fsd, HdfsFileStatus.EMPTY_NAME,
-          i, nodeAttrs, policyId,
-          src.getPathSnapshotId(),
-          isRawPath, src);
+      INodeAttributes nodeAttrs = getINodeAttributes(fsd, path,
+                                                     HdfsFileStatus.EMPTY_NAME,
+                                                     i, src.getPathSnapshotId());
+      return createFileStatus(fsd, HdfsFileStatus.EMPTY_NAME, i, nodeAttrs,
+                              policyId, src.getPathSnapshotId(), isRawPath, src);
     } finally {
       fsd.readUnlock();
     }
@@ -520,4 +582,17 @@ class FSDirStatAndListingOp {
       fsd.readUnlock();
     }
   }
+
+  static class GetBlockLocationsResult {
+    final boolean updateAccessTime;
+    final LocatedBlocks blocks;
+    boolean updateAccessTime() {
+      return updateAccessTime;
+    }
+    private GetBlockLocationsResult(
+        boolean updateAccessTime, LocatedBlocks blocks) {
+      this.updateAccessTime = updateAccessTime;
+      this.blocks = blocks;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 5b8650e..c807fba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -80,6 +80,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
 import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 
 /**
@@ -92,6 +93,7 @@ import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_S
 @InterfaceAudience.Private
 public class FSDirectory implements Closeable {
   static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class);
+
   private static INodeDirectory createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
@@ -328,6 +330,9 @@ public class FSDirectory implements Closeable {
   boolean isAccessTimeSupported() {
     return accessTimePrecision > 0;
   }
+  long getAccessTimePrecision() {
+    return accessTimePrecision;
+  }
   boolean isQuotaByStorageTypeEnabled() {
     return quotaByStorageTypeEnabled;
   }
@@ -1550,6 +1555,21 @@ public class FSDirectory implements Closeable {
     }
   }
 
+  void checkUnreadableBySuperuser(
+      FSPermissionChecker pc, INode inode, int snapshotId)
+      throws IOException {
+    if (pc.isSuperUser()) {
+      for (XAttr xattr : FSDirXAttrOp.getXAttrs(this, inode, snapshotId)) {
+        if (XAttrHelper.getPrefixName(xattr).
+            equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
+          throw new AccessControlException(
+              "Access is denied for " + pc.getUser() + " since the superuser "
+              + "is not allowed to perform this operation.");
+        }
+      }
+    }
+  }
+
   HdfsFileStatus getAuditFileInfo(INodesInPath iip)
       throws IOException {
     return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b97776a..d82da93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -34,8 +34,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEF
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
@@ -87,7 +85,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
+import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
@@ -169,7 +167,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -484,9 +481,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final long minBlockSize;         // minimum block size
   final long maxBlocksPerFile;     // maximum # of blocks per file
 
-  // precision of access times.
-  private final long accessTimePrecision;
-
   /** Lock to protect FSNamesystem. */
   private final FSNamesystemLock fsLock;
 
@@ -800,8 +794,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
       this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
           DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
-      this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
-          DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
 
       this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
       
@@ -1631,14 +1623,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return serverDefaults;
   }
 
-  long getAccessTimePrecision() {
-    return accessTimePrecision;
-  }
-
-  private boolean isAccessTimeSupported() {
-    return accessTimePrecision > 0;
-  }
-
   /////////////////////////////////////////////////////////
   //
   // These methods are called by HadoopFS clients
@@ -1689,19 +1673,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     logAuditEvent(true, "setOwner", src, null, auditStat);
   }
 
-  static class GetBlockLocationsResult {
-    final boolean updateAccessTime;
-    final LocatedBlocks blocks;
-    boolean updateAccessTime() {
-      return updateAccessTime;
-    }
-    private GetBlockLocationsResult(
-        boolean updateAccessTime, LocatedBlocks blocks) {
-      this.updateAccessTime = updateAccessTime;
-      this.blocks = blocks;
-    }
-  }
-
   /**
    * Get block locations within the specified range.
    * @see ClientProtocol#getBlockLocations(String, long, long)
@@ -1714,7 +1685,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      res = getBlockLocations(pc, srcArg, offset, length, true, true);
+      res = FSDirStatAndListingOp.getBlockLocations(
+          dir, pc, srcArg, offset, length, true);
+      if (isInSafeMode()) {
+        for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
+          // if safemode & no block locations yet then throw safemodeException
+          if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
+            SafeModeException se = newSafemodeException(
+                "Zero blocklocations for " + srcArg);
+            if (haEnabled && haContext != null &&
+                haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+              throw new RetriableException(se);
+            } else {
+              throw se;
+            }
+          }
+        }
+      }
     } catch (AccessControlException e) {
       logAuditEvent(false, "open", srcArg);
       throw e;
@@ -1724,7 +1711,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     logAuditEvent(true, "open", srcArg);
 
-    if (res.updateAccessTime()) {
+    if (!isInSafeMode() && res.updateAccessTime()) {
       byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
           srcArg);
       String src = srcArg;
@@ -1754,7 +1741,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         final INodesInPath iip = dir.getINodesInPath(src, true);
         INode inode = iip.getLastINode();
         boolean updateAccessTime = inode != null &&
-            now > inode.getAccessTime() + getAccessTimePrecision();
+            now > inode.getAccessTime() + dir.getAccessTimePrecision();
         if (!isInSafeMode() && updateAccessTime) {
           boolean changed = FSDirAttrOp.setTimes(dir,
               inode, -1, now, false, iip.getLatestSnapshotId());
@@ -1786,88 +1773,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Get block locations within the specified range.
-   * @see ClientProtocol#getBlockLocations(String, long, long)
-   * @throws IOException
-   */
-  GetBlockLocationsResult getBlockLocations(
-      FSPermissionChecker pc, String src, long offset, long length,
-      boolean needBlockToken, boolean checkSafeMode) throws IOException {
-    if (offset < 0) {
-      throw new HadoopIllegalArgumentException(
-          "Negative offset is not supported. File: " + src);
-    }
-    if (length < 0) {
-      throw new HadoopIllegalArgumentException(
-          "Negative length is not supported. File: " + src);
-    }
-    final GetBlockLocationsResult ret = getBlockLocationsInt(
-        pc, src, offset, length, needBlockToken);
-
-    if (checkSafeMode && isInSafeMode()) {
-      for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
-        // if safemode & no block locations yet then throw safemodeException
-        if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
-          SafeModeException se = newSafemodeException(
-              "Zero blocklocations for " + src);
-          if (haEnabled && haContext != null &&
-              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
-            throw new RetriableException(se);
-          } else {
-            throw se;
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
-  private GetBlockLocationsResult getBlockLocationsInt(
-      FSPermissionChecker pc, final String srcArg, long offset, long length,
-      boolean needBlockToken)
-      throws IOException {
-    String src = srcArg;
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    src = dir.resolvePath(pc, srcArg, pathComponents);
-    final INodesInPath iip = dir.getINodesInPath(src, true);
-    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
-    if (isPermissionEnabled) {
-      dir.checkPathAccess(pc, iip, FsAction.READ);
-      checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
-    }
-
-    final long fileSize = iip.isSnapshot()
-        ? inode.computeFileSize(iip.getPathSnapshotId())
-        : inode.computeFileSizeNotIncludingLastUcBlock();
-    boolean isUc = inode.isUnderConstruction();
-    if (iip.isSnapshot()) {
-      // if src indicates a snapshot file, we need to make sure the returned
-      // blocks do not exceed the size of the snapshot file.
-      length = Math.min(length, fileSize - offset);
-      isUc = false;
-    }
-
-    final FileEncryptionInfo feInfo =
-        FSDirectory.isReservedRawName(srcArg) ? null
-            : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
-
-    final LocatedBlocks blocks = blockManager.createLocatedBlocks(
-        inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
-        length, needBlockToken, iip.isSnapshot(), feInfo);
-
-    // Set caching information for the located blocks.
-    for (LocatedBlock lb : blocks.getLocatedBlocks()) {
-      cacheManager.setCachedLocations(lb);
-    }
-
-    final long now = now();
-    boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
-        && !iip.isSnapshot()
-        && now > inode.getAccessTime() + getAccessTimePrecision();
-    return new GetBlockLocationsResult(updateAccessTime, blocks);
-  }
-
-  /**
    * Moves all the blocks from {@code srcs} and appends them to {@code target}
    * To avoid rollbacks we will verify validity of ALL of the args
    * before we start actual move.
@@ -3912,8 +3817,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(NameNode.OperationCategory.READ);
-      dl = FSDirStatAndListingOp.getListingInt(dir, src, startAfter,
-          needLocation);
+      dl = getListingInt(dir, src, startAfter, needLocation);
     } catch (AccessControlException e) {
       logAuditEvent(false, "listStatus", src);
       throw e;
@@ -5309,21 +5213,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
 
-  private void checkUnreadableBySuperuser(FSPermissionChecker pc,
-      INode inode, int snapshotId)
-      throws IOException {
-    if (pc.isSuperUser()) {
-      for (XAttr xattr : FSDirXAttrOp.getXAttrs(dir, inode, snapshotId)) {
-        if (XAttrHelper.getPrefixName(xattr).
-            equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
-          throw new AccessControlException("Access is denied for " +
-              pc.getUser() + " since the superuser is not allowed to " +
-              "perform this operation.");
-        }
-      }
-    }
-  }
-
   @Override
   public void checkSuperuserPrivilege()
       throws AccessControlException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index f67d25a..7d4cd7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -483,8 +483,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     final FSNamesystem fsn = namenode.getNamesystem();
     fsn.readLock();
     try {
-      blocks = fsn.getBlockLocations(
-          fsn.getPermissionChecker(), path, 0, fileLen, false, false)
+      blocks = FSDirStatAndListingOp.getBlockLocations(
+          fsn.getFSDirectory(), fsn.getPermissionChecker(),
+          path, 0, fileLen, false)
           .blocks;
     } catch (FileNotFoundException fnfe) {
       blocks = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/563aa169/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index eabd0c8..8818f17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -1159,20 +1159,21 @@ public class TestFsck {
     Configuration conf = new Configuration();
     NameNode namenode = mock(NameNode.class);
     NetworkTopology nettop = mock(NetworkTopology.class);
-    Map<String,String[]> pmap = new HashMap<String, String[]>();
+    Map<String,String[]> pmap = new HashMap<>();
     Writer result = new StringWriter();
     PrintWriter out = new PrintWriter(result, true);
     InetAddress remoteAddress = InetAddress.getLocalHost();
     FSNamesystem fsName = mock(FSNamesystem.class);
+    FSDirectory fsd = mock(FSDirectory.class);
     BlockManager blockManager = mock(BlockManager.class);
     DatanodeManager dnManager = mock(DatanodeManager.class);
+    INodesInPath iip = mock(INodesInPath.class);
 
     when(namenode.getNamesystem()).thenReturn(fsName);
-    when(fsName.getBlockLocations(any(FSPermissionChecker.class), anyString(),
-                                  anyLong(), anyLong(),
-                                  anyBoolean(), anyBoolean()))
-        .thenThrow(new FileNotFoundException());
     when(fsName.getBlockManager()).thenReturn(blockManager);
+    when(fsName.getFSDirectory()).thenReturn(fsd);
+    when(fsd.getFSNamesystem()).thenReturn(fsName);
+    when(fsd.getINodesInPath(anyString(), anyBoolean())).thenReturn(iip);
     when(blockManager.getDatanodeManager()).thenReturn(dnManager);
 
     NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
@@ -1190,8 +1191,7 @@ public class TestFsck {
     String owner = "foo";
     String group = "bar";
     byte [] symlink = null;
-    byte [] path = new byte[128];
-    path = DFSUtil.string2Bytes(pathString);
+    byte [] path = DFSUtil.string2Bytes(pathString);
     long fileId = 312321L;
     int numChildren = 1;
     byte storagePolicy = 0;
@@ -1204,7 +1204,7 @@ public class TestFsck {
     try {
       fsck.check(pathString, file, res);
     } catch (Exception e) {
-      fail("Unexpected exception "+ e.getMessage());
+      fail("Unexpected exception " + e.getMessage());
     }
     assertTrue(res.toString().contains("HEALTHY"));
   }