You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/12/02 06:36:35 UTC
hadoop git commit: HDFS-7450. Consolidate the implementation of
GetFileInfo(),
GetListings() and GetContentSummary() into a single class. Contributed by
Haohui Mai.
Repository: hadoop
Updated Branches:
refs/heads/trunk 9fa299025 -> 0af44ea84
HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and GetContentSummary() into a single class. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0af44ea8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0af44ea8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0af44ea8
Branch: refs/heads/trunk
Commit: 0af44ea8462437f8e7a8271b15a19677fd7f05a1
Parents: 9fa2990
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Dec 1 15:28:10 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Dec 1 21:36:25 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/namenode/FSDirStatAndListingOp.java | 480 +++++++++++++++++++
.../hdfs/server/namenode/FSDirectory.java | 362 +-------------
.../hdfs/server/namenode/FSEditLogLoader.java | 7 +-
.../hdfs/server/namenode/FSNamesystem.java | 128 ++---
.../hdfs/server/namenode/NameNodeAdapter.java | 3 +-
6 files changed, 534 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index dfdca8d..d5c1fe5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -407,6 +407,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7210. Avoid two separate RPC's namenode.append() and namenode.getFileInfo()
for an append call from DFSClient. (Vinayakumar B via umamahesh)
+ HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and
+ GetContentSummary() into a single class. (wheat9)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
new file mode 100644
index 0000000..35b3a6b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+
+class FSDirStatAndListingOp {
+ static DirectoryListing getListingInt(
+ FSDirectory fsd, final String srcArg, byte[] startAfter,
+ boolean needLocation)
+ throws IOException {
+ String src = srcArg;
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ String startAfterString = new String(startAfter);
+ src = fsd.resolvePath(pc, src, pathComponents);
+
+ // Get file name when startAfter is an INodePath
+ if (FSDirectory.isReservedName(startAfterString)) {
+ byte[][] startAfterComponents = FSDirectory
+ .getPathComponentsForReservedPath(startAfterString);
+ try {
+ String tmp = FSDirectory.resolvePath(src, startAfterComponents, fsd);
+ byte[][] regularPath = INode.getPathComponents(tmp);
+ startAfter = regularPath[regularPath.length - 1];
+ } catch (IOException e) {
+ // Possibly the inode is deleted
+ throw new DirectoryListingStartAfterNotFoundException(
+ "Can't find startAfter " + startAfterString);
+ }
+ }
+
+ boolean isSuperUser = true;
+ if (fsd.isPermissionEnabled()) {
+ if (fsd.isDir(src)) {
+ fsd.checkPathAccess(pc, src, FsAction.READ_EXECUTE);
+ } else {
+ fsd.checkTraverse(pc, src);
+ }
+ isSuperUser = pc.isSuperUser();
+ }
+ return getListing(fsd, src, startAfter, needLocation, isSuperUser);
+ }
+
+ /**
+ * Get the file info for a specific file.
+ *
+ * @param srcArg The string representation of the path to the file
+ * @param resolveLink whether to throw UnresolvedLinkException
+ * if src refers to a symlink
+ *
+ * @return object containing information regarding the file
+ * or null if file not found
+ */
+ static HdfsFileStatus getFileInfo(
+ FSDirectory fsd, String srcArg, boolean resolveLink)
+ throws IOException {
+ String src = srcArg;
+ if (!DFSUtil.isValidName(src)) {
+ throw new InvalidPathException("Invalid file name: " + src);
+ }
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ boolean isSuperUser = true;
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkPermission(pc, src, false, null, null, null, null, false,
+ resolveLink);
+ isSuperUser = pc.isSuperUser();
+ }
+ return getFileInfo(fsd, src, resolveLink,
+ FSDirectory.isReservedRawName(srcArg), isSuperUser);
+ }
+
+ /**
+ * Returns true if the file is closed
+ */
+ static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkTraverse(pc, src);
+ }
+ return !INodeFile.valueOf(fsd.getINode(src), src).isUnderConstruction();
+ }
+
+ static ContentSummary getContentSummary(
+ FSDirectory fsd, String src) throws IOException {
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ src = fsd.resolvePath(pc, src, pathComponents);
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkPermission(pc, src, false, null, null, null,
+ FsAction.READ_EXECUTE);
+ }
+ return getContentSummaryInt(fsd, src);
+ }
+
+ /**
+ * Get a partial listing of the indicated directory
+ *
+ * We will stop when any of the following conditions is met:
+ * 1) this.lsLimit files have been added
+ * 2) needLocation is true AND enough files have been added such
+ * that at least this.lsLimit block locations are in the response
+ *
+ * @param fsd FSDirectory
+ * @param src the directory name
+ * @param startAfter the name to start listing after
+ * @param needLocation if block locations are returned
+ * @return a partial listing starting after startAfter
+ */
+ private static DirectoryListing getListing(
+ FSDirectory fsd, String src, byte[] startAfter, boolean needLocation,
+ boolean isSuperUser)
+ throws IOException {
+ String srcs = FSDirectory.normalizePath(src);
+ final boolean isRawPath = FSDirectory.isReservedRawName(src);
+
+ fsd.readLock();
+ try {
+ if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
+ return getSnapshotsListing(fsd, srcs, startAfter);
+ }
+ final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true);
+ final INode[] inodes = inodesInPath.getINodes();
+ final int snapshot = inodesInPath.getPathSnapshotId();
+ final INode targetNode = inodes[inodes.length - 1];
+ if (targetNode == null)
+ return null;
+ byte parentStoragePolicy = isSuperUser ?
+ targetNode.getStoragePolicyID() : BlockStoragePolicySuite
+ .ID_UNSPECIFIED;
+
+ if (!targetNode.isDirectory()) {
+ return new DirectoryListing(
+ new HdfsFileStatus[]{createFileStatus(fsd,
+ HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
+ parentStoragePolicy, snapshot, isRawPath, inodesInPath)}, 0);
+ }
+
+ final INodeDirectory dirInode = targetNode.asDirectory();
+ final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
+ int startChild = INodeDirectory.nextChild(contents, startAfter);
+ int totalNumChildren = contents.size();
+ int numOfListing = Math.min(totalNumChildren - startChild,
+ fsd.getLsLimit());
+ int locationBudget = fsd.getLsLimit();
+ int listingCnt = 0;
+ HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+ for (int i=0; i<numOfListing && locationBudget>0; i++) {
+ INode cur = contents.get(startChild+i);
+ byte curPolicy = isSuperUser && !cur.isSymlink()?
+ cur.getLocalStoragePolicyID():
+ BlockStoragePolicySuite.ID_UNSPECIFIED;
+ listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
+ needLocation, fsd.getStoragePolicyID(curPolicy,
+ parentStoragePolicy), snapshot, isRawPath, inodesInPath);
+ listingCnt++;
+ if (needLocation) {
+ // Once we hit lsLimit locations, stop.
+ // This helps to prevent excessively large response payloads.
+ // Approximate #locations with locatedBlockCount() * repl_factor
+ LocatedBlocks blks =
+ ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
+ locationBudget -= (blks == null) ? 0 :
+ blks.locatedBlockCount() * listing[i].getReplication();
+ }
+ }
+ // truncate return array if necessary
+ if (listingCnt < numOfListing) {
+ listing = Arrays.copyOf(listing, listingCnt);
+ }
+ return new DirectoryListing(
+ listing, totalNumChildren-startChild-listingCnt);
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+
+ /**
+ * Get a listing of all the snapshots of a snapshottable directory
+ */
+ private static DirectoryListing getSnapshotsListing(
+ FSDirectory fsd, String src, byte[] startAfter)
+ throws IOException {
+ Preconditions.checkState(fsd.hasReadLock());
+ Preconditions.checkArgument(
+ src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
+ "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
+
+ final String dirPath = FSDirectory.normalizePath(src.substring(0,
+ src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
+
+ final INode node = fsd.getINode(dirPath);
+ final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
+ final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
+ if (sf == null) {
+ throw new SnapshotException(
+ "Directory is not a snapshottable directory: " + dirPath);
+ }
+ final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
+ int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
+ skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
+ int numOfListing = Math.min(snapshots.size() - skipSize, fsd.getLsLimit());
+ final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+ for (int i = 0; i < numOfListing; i++) {
+ Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
+ listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot,
+ BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
+ false, null);
+ }
+ return new DirectoryListing(
+ listing, snapshots.size() - skipSize - numOfListing);
+ }
+
+ /** Get the file info for a specific file.
+ * @param fsd FSDirectory
+ * @param src The string representation of the path to the file
+ * @param resolveLink whether to throw UnresolvedLinkException
+ * @param isRawPath true if a /.reserved/raw pathname was passed by the user
+ * @param includeStoragePolicy whether to include storage policy
+ * @return object containing information regarding the file
+ * or null if file not found
+ */
+ static HdfsFileStatus getFileInfo(
+ FSDirectory fsd, String src, boolean resolveLink, boolean isRawPath,
+ boolean includeStoragePolicy)
+ throws IOException {
+ String srcs = FSDirectory.normalizePath(src);
+ fsd.readLock();
+ try {
+ if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
+ return getFileInfo4DotSnapshot(fsd, srcs);
+ }
+ final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, resolveLink);
+ final INode[] inodes = inodesInPath.getINodes();
+ final INode i = inodes[inodes.length - 1];
+ byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
+ i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
+ return i == null ? null : createFileStatus(fsd,
+ HdfsFileStatus.EMPTY_NAME, i, policyId,
+ inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath);
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+
+ /**
+ * Currently we only support "ls /xxx/.snapshot" which will return all the
+ * snapshots of a directory. The FSCommand Ls will first call getFileInfo to
+ * make sure the file/directory exists (before the real getListing call).
+ * Since we do not have a real INode for ".snapshot", we return an empty
+ * non-null HdfsFileStatus here.
+ */
+ private static HdfsFileStatus getFileInfo4DotSnapshot(
+ FSDirectory fsd, String src)
+ throws UnresolvedLinkException {
+ if (fsd.getINode4DotSnapshot(src) != null) {
+ return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
+ HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
+ BlockStoragePolicySuite.ID_UNSPECIFIED);
+ }
+ return null;
+ }
+
+ /**
+ * create an hdfs file status from an inode
+ *
+ * @param fsd FSDirectory
+ * @param path the local name
+ * @param node inode
+ * @param needLocation if block locations need to be included or not
+ * @param isRawPath true if this is being called on behalf of a path in
+ * /.reserved/raw
+ * @return a file status
+ * @throws java.io.IOException if any error occurs
+ */
+ static HdfsFileStatus createFileStatus(
+ FSDirectory fsd, byte[] path, INode node, boolean needLocation,
+ byte storagePolicy, int snapshot, boolean isRawPath, INodesInPath iip)
+ throws IOException {
+ if (needLocation) {
+ return createLocatedFileStatus(fsd, path, node, storagePolicy,
+ snapshot, isRawPath, iip);
+ } else {
+ return createFileStatus(fsd, path, node, storagePolicy, snapshot,
+ isRawPath, iip);
+ }
+ }
+
+ /**
+ * Create FileStatus by file INode
+ */
+ static HdfsFileStatus createFileStatus(
+ FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
+ int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+ long size = 0; // length is zero for directories
+ short replication = 0;
+ long blocksize = 0;
+ final boolean isEncrypted;
+
+ final FileEncryptionInfo feInfo = isRawPath ? null :
+ fsd.getFileEncryptionInfo(node, snapshot, iip);
+
+ if (node.isFile()) {
+ final INodeFile fileNode = node.asFile();
+ size = fileNode.computeFileSize(snapshot);
+ replication = fileNode.getFileReplication(snapshot);
+ blocksize = fileNode.getPreferredBlockSize();
+ isEncrypted = (feInfo != null) ||
+ (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
+ } else {
+ isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+ }
+
+ int childrenNum = node.isDirectory() ?
+ node.asDirectory().getChildrenNum(snapshot) : 0;
+
+ return new HdfsFileStatus(
+ size,
+ node.isDirectory(),
+ replication,
+ blocksize,
+ node.getModificationTime(snapshot),
+ node.getAccessTime(snapshot),
+ getPermissionForFileStatus(node, snapshot, isEncrypted),
+ node.getUserName(snapshot),
+ node.getGroupName(snapshot),
+ node.isSymlink() ? node.asSymlink().getSymlink() : null,
+ path,
+ node.getId(),
+ childrenNum,
+ feInfo,
+ storagePolicy);
+ }
+
+ /**
+ * Create FileStatus with location info by file INode
+ */
+ private static HdfsLocatedFileStatus createLocatedFileStatus(
+ FSDirectory fsd, byte[] path, INode node, byte storagePolicy,
+ int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
+ assert fsd.hasReadLock();
+ long size = 0; // length is zero for directories
+ short replication = 0;
+ long blocksize = 0;
+ LocatedBlocks loc = null;
+ final boolean isEncrypted;
+ final FileEncryptionInfo feInfo = isRawPath ? null :
+ fsd.getFileEncryptionInfo(node, snapshot, iip);
+ if (node.isFile()) {
+ final INodeFile fileNode = node.asFile();
+ size = fileNode.computeFileSize(snapshot);
+ replication = fileNode.getFileReplication(snapshot);
+ blocksize = fileNode.getPreferredBlockSize();
+
+ final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
+ final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
+ final long fileSize = !inSnapshot && isUc ?
+ fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
+
+ loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
+ fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
+ inSnapshot, feInfo);
+ if (loc == null) {
+ loc = new LocatedBlocks();
+ }
+ isEncrypted = (feInfo != null) ||
+ (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node)));
+ } else {
+ isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node));
+ }
+ int childrenNum = node.isDirectory() ?
+ node.asDirectory().getChildrenNum(snapshot) : 0;
+
+ HdfsLocatedFileStatus status =
+ new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
+ blocksize, node.getModificationTime(snapshot),
+ node.getAccessTime(snapshot),
+ getPermissionForFileStatus(node, snapshot, isEncrypted),
+ node.getUserName(snapshot), node.getGroupName(snapshot),
+ node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
+ node.getId(), loc, childrenNum, feInfo, storagePolicy);
+ // Set caching information for the located blocks.
+ if (loc != null) {
+ CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();
+ for (LocatedBlock lb: loc.getLocatedBlocks()) {
+ cacheManager.setCachedLocations(lb);
+ }
+ }
+ return status;
+ }
+
+ /**
+ * Returns an inode's FsPermission for use in an outbound FileStatus. If the
+ * inode has an ACL or is for an encrypted file/dir, then this method will
+ * return an FsPermissionExtension.
+ *
+ * @param node INode to check
+ * @param snapshot int snapshot ID
+ * @param isEncrypted boolean true if the file/dir is encrypted
+ * @return FsPermission from inode, with ACL bit on if the inode has an ACL
+ * and encrypted bit on if it represents an encrypted file/dir.
+ */
+ private static FsPermission getPermissionForFileStatus(
+ INode node, int snapshot, boolean isEncrypted) {
+ FsPermission perm = node.getFsPermission(snapshot);
+ boolean hasAcl = node.getAclFeature(snapshot) != null;
+ if (hasAcl || isEncrypted) {
+ perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
+ }
+ return perm;
+ }
+
+ private static ContentSummary getContentSummaryInt(
+ FSDirectory fsd, String src) throws IOException {
+ String srcs = FSDirectory.normalizePath(src);
+ fsd.readLock();
+ try {
+ INode targetNode = fsd.getNode(srcs, false);
+ if (targetNode == null) {
+ throw new FileNotFoundException("File does not exist: " + srcs);
+ }
+ else {
+ // Make it relinquish locks everytime contentCountLimit entries are
+ // processed. 0 means disabled. I.e. blocking for the entire duration.
+ ContentSummaryComputationContext cscc =
+ new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(),
+ fsd.getContentCountLimit());
+ ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
+ fsd.addYieldCount(cscc.getYieldCount());
+ return cs;
+ }
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 07cad42..0c0b2af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -63,16 +63,11 @@ import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
-import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -86,16 +81,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.ChunkedArrayList;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -259,7 +252,7 @@ public class FSDirectory implements Closeable {
ezManager = new EncryptionZoneManager(this, conf);
}
- private FSNamesystem getFSNamesystem() {
+ FSNamesystem getFSNamesystem() {
return namesystem;
}
@@ -276,6 +269,14 @@ public class FSDirectory implements Closeable {
return isPermissionEnabled;
}
+ int getLsLimit() {
+ return lsLimit;
+ }
+
+ int getContentCountLimit() {
+ return contentCountLimit;
+ }
+
FSEditLog getEditLog() {
return editLog;
}
@@ -1343,172 +1344,12 @@ public class FSDirectory implements Closeable {
return removed;
}
- private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
+ byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
parentPolicy;
}
- /**
- * Get a partial listing of the indicated directory
- *
- * We will stop when any of the following conditions is met:
- * 1) this.lsLimit files have been added
- * 2) needLocation is true AND enough files have been added such
- * that at least this.lsLimit block locations are in the response
- *
- * @param src the directory name
- * @param startAfter the name to start listing after
- * @param needLocation if block locations are returned
- * @return a partial listing starting after startAfter
- */
- DirectoryListing getListing(String src, byte[] startAfter,
- boolean needLocation, boolean isSuperUser)
- throws UnresolvedLinkException, IOException {
- String srcs = normalizePath(src);
- final boolean isRawPath = isReservedRawName(src);
-
- readLock();
- try {
- if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
- return getSnapshotsListing(srcs, startAfter);
- }
- final INodesInPath inodesInPath = getINodesInPath(srcs, true);
- final INode[] inodes = inodesInPath.getINodes();
- final int snapshot = inodesInPath.getPathSnapshotId();
- final INode targetNode = inodes[inodes.length - 1];
- if (targetNode == null)
- return null;
- byte parentStoragePolicy = isSuperUser ?
- targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
-
- if (!targetNode.isDirectory()) {
- return new DirectoryListing(
- new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
- targetNode, needLocation, parentStoragePolicy, snapshot,
- isRawPath, inodesInPath)}, 0);
- }
-
- final INodeDirectory dirInode = targetNode.asDirectory();
- final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
- int startChild = INodeDirectory.nextChild(contents, startAfter);
- int totalNumChildren = contents.size();
- int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
- int locationBudget = this.lsLimit;
- int listingCnt = 0;
- HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
- for (int i=0; i<numOfListing && locationBudget>0; i++) {
- INode cur = contents.get(startChild+i);
- byte curPolicy = isSuperUser && !cur.isSymlink()?
- cur.getLocalStoragePolicyID():
- BlockStoragePolicySuite.ID_UNSPECIFIED;
- listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation,
- getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot,
- isRawPath, inodesInPath);
- listingCnt++;
- if (needLocation) {
- // Once we hit lsLimit locations, stop.
- // This helps to prevent excessively large response payloads.
- // Approximate #locations with locatedBlockCount() * repl_factor
- LocatedBlocks blks =
- ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
- locationBudget -= (blks == null) ? 0 :
- blks.locatedBlockCount() * listing[i].getReplication();
- }
- }
- // truncate return array if necessary
- if (listingCnt < numOfListing) {
- listing = Arrays.copyOf(listing, listingCnt);
- }
- return new DirectoryListing(
- listing, totalNumChildren-startChild-listingCnt);
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Get a listing of all the snapshots of a snapshottable directory
- */
- private DirectoryListing getSnapshotsListing(String src, byte[] startAfter)
- throws UnresolvedLinkException, IOException {
- Preconditions.checkState(hasReadLock());
- Preconditions.checkArgument(
- src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
- "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-
- final String dirPath = normalizePath(src.substring(0,
- src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
-
- final INode node = this.getINode(dirPath);
- final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
- final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
- if (sf == null) {
- throw new SnapshotException(
- "Directory is not a snapshottable directory: " + dirPath);
- }
- final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
- int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
- skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
- int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
- final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
- for (int i = 0; i < numOfListing; i++) {
- Root sRoot = snapshots.get(i + skipSize).getRoot();
- listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
- BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
- false, null);
- }
- return new DirectoryListing(
- listing, snapshots.size() - skipSize - numOfListing);
- }
-
- /** Get the file info for a specific file.
- * @param src The string representation of the path to the file
- * @param resolveLink whether to throw UnresolvedLinkException
- * @param isRawPath true if a /.reserved/raw pathname was passed by the user
- * @param includeStoragePolicy whether to include storage policy
- * @return object containing information regarding the file
- * or null if file not found
- */
- HdfsFileStatus getFileInfo(String src, boolean resolveLink,
- boolean isRawPath, boolean includeStoragePolicy)
- throws IOException {
- String srcs = normalizePath(src);
- readLock();
- try {
- if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
- return getFileInfo4DotSnapshot(srcs);
- }
- final INodesInPath inodesInPath = getINodesInPath(srcs, resolveLink);
- final INode[] inodes = inodesInPath.getINodes();
- final INode i = inodes[inodes.length - 1];
- byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
- i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
- return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
- policyId, inodesInPath.getPathSnapshotId(), isRawPath,
- inodesInPath);
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Currently we only support "ls /xxx/.snapshot" which will return all the
- * snapshots of a directory. The FSCommand Ls will first call getFileInfo to
- * make sure the file/directory exists (before the real getListing call).
- * Since we do not have a real INode for ".snapshot", we return an empty
- * non-null HdfsFileStatus here.
- */
- private HdfsFileStatus getFileInfo4DotSnapshot(String src)
- throws UnresolvedLinkException {
- if (getINode4DotSnapshot(src) != null) {
- return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
- HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
- BlockStoragePolicySuite.ID_UNSPECIFIED);
- }
- return null;
- }
-
- private INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
+ INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
Preconditions.checkArgument(
src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
"%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
@@ -2090,36 +1931,15 @@ public class FSDirectory implements Closeable {
return src;
}
- ContentSummary getContentSummary(String src)
- throws FileNotFoundException, UnresolvedLinkException {
- String srcs = normalizePath(src);
- readLock();
- try {
- INode targetNode = getNode(srcs, false);
- if (targetNode == null) {
- throw new FileNotFoundException("File does not exist: " + srcs);
- }
- else {
- // Make it relinquish locks everytime contentCountLimit entries are
- // processed. 0 means disabled. I.e. blocking for the entire duration.
- ContentSummaryComputationContext cscc =
-
- new ContentSummaryComputationContext(this, getFSNamesystem(),
- contentCountLimit);
- ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
- yieldCount += cscc.getYieldCount();
- return cs;
- }
- } finally {
- readUnlock();
- }
- }
-
@VisibleForTesting
public long getYieldCount() {
return yieldCount;
}
+ void addYieldCount(long value) {
+ yieldCount += value;
+ }
+
public INodeMap getINodeMap() {
return inodeMap;
}
@@ -2329,153 +2149,6 @@ public class FSDirectory implements Closeable {
}
/**
- * create an hdfs file status from an inode
- *
- * @param path the local name
- * @param node inode
- * @param needLocation if block locations need to be included or not
- * @param isRawPath true if this is being called on behalf of a path in
- * /.reserved/raw
- * @return a file status
- * @throws IOException if any error occurs
- */
- private HdfsFileStatus createFileStatus(byte[] path, INode node,
- boolean needLocation, byte storagePolicy, int snapshot,
- boolean isRawPath, INodesInPath iip)
- throws IOException {
- if (needLocation) {
- return createLocatedFileStatus(path, node, storagePolicy, snapshot,
- isRawPath, iip);
- } else {
- return createFileStatus(path, node, storagePolicy, snapshot,
- isRawPath, iip);
- }
- }
-
- /**
- * Create FileStatus by file INode
- */
- HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
- int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
- long size = 0; // length is zero for directories
- short replication = 0;
- long blocksize = 0;
- final boolean isEncrypted;
-
- final FileEncryptionInfo feInfo = isRawPath ? null :
- getFileEncryptionInfo(node, snapshot, iip);
-
- if (node.isFile()) {
- final INodeFile fileNode = node.asFile();
- size = fileNode.computeFileSize(snapshot);
- replication = fileNode.getFileReplication(snapshot);
- blocksize = fileNode.getPreferredBlockSize();
- isEncrypted = (feInfo != null) ||
- (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
- } else {
- isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
- }
-
- int childrenNum = node.isDirectory() ?
- node.asDirectory().getChildrenNum(snapshot) : 0;
-
- return new HdfsFileStatus(
- size,
- node.isDirectory(),
- replication,
- blocksize,
- node.getModificationTime(snapshot),
- node.getAccessTime(snapshot),
- getPermissionForFileStatus(node, snapshot, isEncrypted),
- node.getUserName(snapshot),
- node.getGroupName(snapshot),
- node.isSymlink() ? node.asSymlink().getSymlink() : null,
- path,
- node.getId(),
- childrenNum,
- feInfo,
- storagePolicy);
- }
-
- /**
- * Create FileStatus with location info by file INode
- */
- private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
- byte storagePolicy, int snapshot, boolean isRawPath,
- INodesInPath iip) throws IOException {
- assert hasReadLock();
- long size = 0; // length is zero for directories
- short replication = 0;
- long blocksize = 0;
- LocatedBlocks loc = null;
- final boolean isEncrypted;
- final FileEncryptionInfo feInfo = isRawPath ? null :
- getFileEncryptionInfo(node, snapshot, iip);
- if (node.isFile()) {
- final INodeFile fileNode = node.asFile();
- size = fileNode.computeFileSize(snapshot);
- replication = fileNode.getFileReplication(snapshot);
- blocksize = fileNode.getPreferredBlockSize();
-
- final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
- final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
- final long fileSize = !inSnapshot && isUc ?
- fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
-
- loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
- fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
- inSnapshot, feInfo);
- if (loc == null) {
- loc = new LocatedBlocks();
- }
- isEncrypted = (feInfo != null) ||
- (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
- } else {
- isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
- }
- int childrenNum = node.isDirectory() ?
- node.asDirectory().getChildrenNum(snapshot) : 0;
-
- HdfsLocatedFileStatus status =
- new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
- blocksize, node.getModificationTime(snapshot),
- node.getAccessTime(snapshot),
- getPermissionForFileStatus(node, snapshot, isEncrypted),
- node.getUserName(snapshot), node.getGroupName(snapshot),
- node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
- node.getId(), loc, childrenNum, feInfo, storagePolicy);
- // Set caching information for the located blocks.
- if (loc != null) {
- CacheManager cacheManager = namesystem.getCacheManager();
- for (LocatedBlock lb: loc.getLocatedBlocks()) {
- cacheManager.setCachedLocations(lb);
- }
- }
- return status;
- }
-
- /**
- * Returns an inode's FsPermission for use in an outbound FileStatus. If the
- * inode has an ACL or is for an encrypted file/dir, then this method will
- * return an FsPermissionExtension.
- *
- * @param node INode to check
- * @param snapshot int snapshot ID
- * @param isEncrypted boolean true if the file/dir is encrypted
- * @return FsPermission from inode, with ACL bit on if the inode has an ACL
- * and encrypted bit on if it represents an encrypted file/dir.
- */
- private static FsPermission getPermissionForFileStatus(INode node,
- int snapshot, boolean isEncrypted) {
- FsPermission perm = node.getFsPermission(snapshot);
- boolean hasAcl = node.getAclFeature(snapshot) != null;
- if (hasAcl || isEncrypted) {
- perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
- }
- return perm;
- }
-
- /**
* Add the specified path into the namespace.
*/
INodeSymlink addSymlink(long id, String path, String target,
@@ -3322,6 +2995,7 @@ public class FSDirectory implements Closeable {
HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
throws IOException {
return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
- ? getFileInfo(path, resolveSymlink, false, false) : null;
+ ? FSDirStatAndListingOp.getFileInfo(this, path, resolveSymlink, false,
+ false) : null;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 1a51b87..d57d9b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -372,8 +372,8 @@ public class FSEditLogLoader {
// add the op into retry cache if necessary
if (toAddRetryCache) {
- HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
- HdfsFileStatus.EMPTY_NAME, newFile,
+ HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
+ fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, newFile,
BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
false, iip);
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
@@ -393,7 +393,8 @@ public class FSEditLogLoader {
// add the op into retry cache is necessary
if (toAddRetryCache) {
- HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
+ HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
+ fsNamesys.dir,
HdfsFileStatus.EMPTY_NAME, newFile,
BlockStoragePolicySuite.ID_UNSPECIFIED,
Snapshot.CURRENT_STATE_ID, false, iip);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 390bc20..543f47a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -143,7 +143,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
@@ -2474,7 +2473,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
toRemoveBlocks = startFileInternal(pc, src, permissions, holder,
clientMachine, create, overwrite, createParent, replication,
blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
- stat = dir.getFileInfo(src, false,
+ stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
FSDirectory.isReservedRawName(srcArg), true);
} catch (StandbyException se) {
skipSync = true;
@@ -2923,8 +2922,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode("Cannot append to file" + src);
src = dir.resolvePath(pc, src, pathComponents);
lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
- stat = dir.getFileInfo(src, false, FSDirectory.isReservedRawName(srcArg),
- true);
+ stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
+ FSDirectory.isReservedRawName(srcArg), true);
} catch (StandbyException se) {
skipSync = true;
throw se;
@@ -3921,7 +3920,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* Get the file info for a specific file.
*
* @param srcArg The string representation of the path to the file
- * @param resolveLink whether to throw UnresolvedLinkException
+ * @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
*
* @throws AccessControlException if access is denied
@@ -3929,63 +3928,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*
* @return object containing information regarding the file
* or null if file not found
- * @throws StandbyException
+ * @throws StandbyException
*/
- HdfsFileStatus getFileInfo(final String srcArg, boolean resolveLink)
- throws AccessControlException, UnresolvedLinkException,
- StandbyException, IOException {
- String src = srcArg;
- if (!DFSUtil.isValidName(src)) {
- throw new InvalidPathException("Invalid file name: " + src);
- }
- HdfsFileStatus stat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ HdfsFileStatus getFileInfo(final String src, boolean resolveLink)
+ throws IOException {
checkOperation(OperationCategory.READ);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ HdfsFileStatus stat = null;
readLock();
try {
checkOperation(OperationCategory.READ);
- src = dir.resolvePath(pc, src, pathComponents);
- boolean isSuperUser = true;
- if (isPermissionEnabled) {
- checkPermission(pc, src, false, null, null, null, null, false,
- resolveLink);
- isSuperUser = pc.isSuperUser();
- }
- stat = dir.getFileInfo(src, resolveLink,
- FSDirectory.isReservedRawName(srcArg), isSuperUser);
+ stat = FSDirStatAndListingOp.getFileInfo(dir, src, resolveLink);
} catch (AccessControlException e) {
- logAuditEvent(false, "getfileinfo", srcArg);
+ logAuditEvent(false, "getfileinfo", src);
throw e;
} finally {
readUnlock();
}
- logAuditEvent(true, "getfileinfo", srcArg);
+ logAuditEvent(true, "getfileinfo", src);
return stat;
}
-
+
/**
* Returns true if the file is closed
*/
- boolean isFileClosed(final String srcArg)
- throws AccessControlException, UnresolvedLinkException,
- StandbyException, IOException {
- String src = srcArg;
- FSPermissionChecker pc = getPermissionChecker();
+ boolean isFileClosed(final String src) throws IOException {
checkOperation(OperationCategory.READ);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
- src = dir.resolvePath(pc, src, pathComponents);
checkOperation(OperationCategory.READ);
- if (isPermissionEnabled) {
- checkTraverse(pc, src);
- }
- return !INodeFile.valueOf(dir.getINode(src), src).isUnderConstruction();
+ return FSDirStatAndListingOp.isFileClosed(dir, src);
} catch (AccessControlException e) {
- if (isAuditEnabled() && isExternalInvocation()) {
- logAuditEvent(false, "isFileClosed", srcArg);
- }
+ logAuditEvent(false, "isFileClosed", src);
throw e;
} finally {
readUnlock();
@@ -4182,7 +4155,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Get the content summary for a specific file/dir.
*
- * @param srcArg The string representation of the path to the file
+ * @param src The string representation of the path to the file
*
* @throws AccessControlException if access is denied
* @throws UnresolvedLinkException if a symlink is encountered.
@@ -4193,27 +4166,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @return object containing information regarding the file
* or null if file not found
*/
- ContentSummary getContentSummary(final String srcArg) throws IOException {
- String src = srcArg;
- FSPermissionChecker pc = getPermissionChecker();
- checkOperation(OperationCategory.READ);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ ContentSummary getContentSummary(final String src) throws IOException {
readLock();
boolean success = true;
try {
- checkOperation(OperationCategory.READ);
- src = dir.resolvePath(pc, src, pathComponents);
- if (isPermissionEnabled) {
- checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
- }
- return dir.getContentSummary(src);
-
+ return FSDirStatAndListingOp.getContentSummary(dir, src);
} catch (AccessControlException ace) {
success = false;
throw ace;
} finally {
readUnlock();
- logAuditEvent(success, "contentSummary", srcArg);
+ logAuditEvent(success, "contentSummary", src);
}
}
@@ -4722,58 +4685,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation)
- throws AccessControlException, UnresolvedLinkException, IOException {
+ throws IOException {
+ checkOperation(OperationCategory.READ);
+ DirectoryListing dl = null;
+ readLock();
try {
- return getListingInt(src, startAfter, needLocation);
+ checkOperation(NameNode.OperationCategory.READ);
+ dl = FSDirStatAndListingOp.getListingInt(dir, src, startAfter,
+ needLocation);
} catch (AccessControlException e) {
logAuditEvent(false, "listStatus", src);
throw e;
- }
- }
-
- private DirectoryListing getListingInt(final String srcArg, byte[] startAfter,
- boolean needLocation)
- throws AccessControlException, UnresolvedLinkException, IOException {
- String src = srcArg;
- DirectoryListing dl;
- FSPermissionChecker pc = getPermissionChecker();
- checkOperation(OperationCategory.READ);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
- String startAfterString = new String(startAfter);
- readLock();
- try {
- checkOperation(OperationCategory.READ);
- src = dir.resolvePath(pc, src, pathComponents);
-
- // Get file name when startAfter is an INodePath
- if (FSDirectory.isReservedName(startAfterString)) {
- byte[][] startAfterComponents = FSDirectory
- .getPathComponentsForReservedPath(startAfterString);
- try {
- String tmp = FSDirectory.resolvePath(src, startAfterComponents, dir);
- byte[][] regularPath = INode.getPathComponents(tmp);
- startAfter = regularPath[regularPath.length - 1];
- } catch (IOException e) {
- // Possibly the inode is deleted
- throw new DirectoryListingStartAfterNotFoundException(
- "Can't find startAfter " + startAfterString);
- }
- }
-
- boolean isSuperUser = true;
- if (isPermissionEnabled) {
- if (dir.isDir(src)) {
- checkPathAccess(pc, src, FsAction.READ_EXECUTE);
- } else {
- checkTraverse(pc, src);
- }
- isSuperUser = pc.isSuperUser();
- }
- logAuditEvent(true, "listStatus", srcArg);
- dl = dir.getListing(src, startAfter, needLocation, isSuperUser);
} finally {
readUnlock();
}
+ logAuditEvent(true, "listStatus", src);
return dl;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af44ea8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index c32ed67..1a42e28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -70,7 +70,8 @@ public class NameNodeAdapter {
public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
StandbyException, IOException {
- return namenode.getNamesystem().getFileInfo(src, resolveLink);
+ return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
+ .getFSDirectory(), src, resolveLink);
}
public static boolean mkdirs(NameNode namenode, String src,