You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/09 04:31:26 UTC
[32/41] hadoop git commit: HDFS-7476. Consolidate ACL-related
operations to a single class. Contributed by Haohui Mai.
HDFS-7476. Consolidate ACL-related operations to a single class. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9297f980
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9297f980
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9297f980
Branch: refs/heads/YARN-2139
Commit: 9297f980c2de8886ff970946a2513e6890cd5552
Parents: e227fb8
Author: cnauroth <cn...@apache.org>
Authored: Sat Dec 6 14:20:00 2014 -0800
Committer: cnauroth <cn...@apache.org>
Committed: Sat Dec 6 14:20:00 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/server/namenode/AclStorage.java | 33 ---
.../hadoop/hdfs/server/namenode/FSDirAclOp.java | 244 +++++++++++++++++++
.../hdfs/server/namenode/FSDirectory.java | 158 ++----------
.../hdfs/server/namenode/FSEditLogLoader.java | 2 +-
.../hdfs/server/namenode/FSNamesystem.java | 119 ++-------
.../hdfs/server/namenode/TestAuditLogger.java | 79 ++----
7 files changed, 318 insertions(+), 320 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 87b02c4..769be43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -438,6 +438,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7459. Consolidate cache-related implementation in FSNamesystem into
a single class. (wheat9)
+ HDFS-7476. Consolidate ACL-related operations to a single class.
+ (wheat9 via cnauroth)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
index ac30597..a866046 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
@@ -241,39 +241,6 @@ final class AclStorage {
}
/**
- * Completely removes the ACL from an inode.
- *
- * @param inode INode to update
- * @param snapshotId int latest snapshot ID of inode
- * @throws QuotaExceededException if quota limit is exceeded
- */
- public static void removeINodeAcl(INode inode, int snapshotId)
- throws QuotaExceededException {
- AclFeature f = inode.getAclFeature();
- if (f == null) {
- return;
- }
-
- FsPermission perm = inode.getFsPermission();
- List<AclEntry> featureEntries = getEntriesFromAclFeature(f);
- if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
- // Restore group permissions from the feature's entry to permission
- // bits, overwriting the mask, which is not part of a minimal ACL.
- AclEntry groupEntryKey = new AclEntry.Builder()
- .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
- int groupEntryIndex = Collections.binarySearch(featureEntries,
- groupEntryKey, AclTransformation.ACL_ENTRY_COMPARATOR);
- assert groupEntryIndex >= 0;
- FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
- FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
- perm.getOtherAction(), perm.getStickyBit());
- inode.setPermission(newPerm, snapshotId);
- }
-
- inode.removeAclFeature(snapshotId);
- }
-
- /**
* Updates an inode with a new ACL. This method takes a full logical ACL and
* stores the entries to the inode's {@link FsPermission} and
* {@link AclFeature}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
new file mode 100644
index 0000000..ac899aa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.AclException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+class FSDirAclOp {
+ static HdfsFileStatus modifyAclEntries(
+ FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
+ throws IOException {
+ String src = srcArg;
+ checkAclsConfigFlag(fsd);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ fsd.writeLock();
+ try {
+ INodesInPath iip = fsd.getINodesInPath4Write(
+ FSDirectory.normalizePath(src), true);
+ fsd.checkOwner(pc, iip);
+ INode inode = FSDirectory.resolveLastINode(src, iip);
+ int snapshotId = iip.getLatestSnapshotId();
+ List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+ List<AclEntry> newAcl = AclTransformation.mergeAclEntries(
+ existingAcl, aclSpec);
+ AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+ fsd.getEditLog().logSetAcl(src, newAcl);
+ } finally {
+ fsd.writeUnlock();
+ }
+ return fsd.getAuditFileInfo(src, false);
+ }
+
+ static HdfsFileStatus removeAclEntries(
+ FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
+ throws IOException {
+ String src = srcArg;
+ checkAclsConfigFlag(fsd);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ fsd.writeLock();
+ try {
+ INodesInPath iip = fsd.getINodesInPath4Write(
+ FSDirectory.normalizePath(src), true);
+ fsd.checkOwner(pc, iip);
+ INode inode = FSDirectory.resolveLastINode(src, iip);
+ int snapshotId = iip.getLatestSnapshotId();
+ List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+ List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
+ existingAcl, aclSpec);
+ AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+ fsd.getEditLog().logSetAcl(src, newAcl);
+ } finally {
+ fsd.writeUnlock();
+ }
+ return fsd.getAuditFileInfo(src, false);
+ }
+
+ static HdfsFileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
+ throws IOException {
+ String src = srcArg;
+ checkAclsConfigFlag(fsd);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ fsd.writeLock();
+ try {
+ INodesInPath iip = fsd.getINodesInPath4Write(
+ FSDirectory.normalizePath(src), true);
+ fsd.checkOwner(pc, iip);
+ INode inode = FSDirectory.resolveLastINode(src, iip);
+ int snapshotId = iip.getLatestSnapshotId();
+ List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+ List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
+ existingAcl);
+ AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+ fsd.getEditLog().logSetAcl(src, newAcl);
+ } finally {
+ fsd.writeUnlock();
+ }
+ return fsd.getAuditFileInfo(src, false);
+ }
+
+ static HdfsFileStatus removeAcl(FSDirectory fsd, final String srcArg)
+ throws IOException {
+ String src = srcArg;
+ checkAclsConfigFlag(fsd);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ fsd.writeLock();
+ try {
+ INodesInPath iip = fsd.getINodesInPath4Write(src);
+ fsd.checkOwner(pc, iip);
+ unprotectedRemoveAcl(fsd, src);
+ } finally {
+ fsd.writeUnlock();
+ }
+ fsd.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
+ return fsd.getAuditFileInfo(src, false);
+ }
+
+ static HdfsFileStatus setAcl(
+ FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
+ throws IOException {
+ String src = srcArg;
+ checkAclsConfigFlag(fsd);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ src = fsd.resolvePath(pc, src, pathComponents);
+ fsd.writeLock();
+ try {
+ INodesInPath iip = fsd.getINodesInPath4Write(src);
+ fsd.checkOwner(pc, iip);
+ List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec);
+ fsd.getEditLog().logSetAcl(src, newAcl);
+ } finally {
+ fsd.writeUnlock();
+ }
+ return fsd.getAuditFileInfo(src, false);
+ }
+
+ static AclStatus getAclStatus(
+ FSDirectory fsd, String src) throws IOException {
+ checkAclsConfigFlag(fsd);
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ String srcs = FSDirectory.normalizePath(src);
+ fsd.readLock();
+ try {
+ // There is no real inode for the path ending in ".snapshot", so return a
+ // non-null, unpopulated AclStatus. This is similar to getFileInfo.
+ if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
+ fsd.getINode4DotSnapshot(srcs) != null) {
+ return new AclStatus.Builder().owner("").group("").build();
+ }
+ INodesInPath iip = fsd.getINodesInPath(srcs, true);
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkTraverse(pc, iip);
+ }
+ INode inode = FSDirectory.resolveLastINode(srcs, iip);
+ int snapshotId = iip.getPathSnapshotId();
+ List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
+ return new AclStatus.Builder()
+ .owner(inode.getUserName()).group(inode.getGroupName())
+ .stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
+ .addEntries(acl).build();
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+
+ static List<AclEntry> unprotectedSetAcl(
+ FSDirectory fsd, String src, List<AclEntry> aclSpec)
+ throws IOException {
+ // ACL removal is logged to edits as OP_SET_ACL with an empty list.
+ if (aclSpec.isEmpty()) {
+ unprotectedRemoveAcl(fsd, src);
+ return AclFeature.EMPTY_ENTRY_LIST;
+ }
+
+ assert fsd.hasWriteLock();
+ INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath
+ (src), true);
+ INode inode = FSDirectory.resolveLastINode(src, iip);
+ int snapshotId = iip.getLatestSnapshotId();
+ List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
+ List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
+ aclSpec);
+ AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+ return newAcl;
+ }
+
+ private static void checkAclsConfigFlag(FSDirectory fsd) throws AclException {
+ if (!fsd.isAclsEnabled()) {
+ throw new AclException(String.format(
+ "The ACL operation has been rejected. "
+ + "Support for ACLs has been disabled by setting %s to false.",
+ DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
+ }
+ }
+
+ private static void unprotectedRemoveAcl(FSDirectory fsd, String src)
+ throws IOException {
+ assert fsd.hasWriteLock();
+ INodesInPath iip = fsd.getINodesInPath4Write(
+ FSDirectory.normalizePath(src), true);
+ INode inode = FSDirectory.resolveLastINode(src, iip);
+ int snapshotId = iip.getLatestSnapshotId();
+ AclFeature f = inode.getAclFeature();
+ if (f == null) {
+ return;
+ }
+
+ FsPermission perm = inode.getFsPermission();
+ List<AclEntry> featureEntries = AclStorage.getEntriesFromAclFeature(f);
+ if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
+ // Restore group permissions from the feature's entry to permission
+ // bits, overwriting the mask, which is not part of a minimal ACL.
+ AclEntry groupEntryKey = new AclEntry.Builder()
+ .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
+ int groupEntryIndex = Collections.binarySearch(
+ featureEntries, groupEntryKey,
+ AclTransformation.ACL_ENTRY_COMPARATOR);
+ assert groupEntryIndex >= 0;
+ FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
+ FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
+ perm.getOtherAction(), perm.getStickyBit());
+ inode.setPermission(newPerm, snapshotId);
+ }
+
+ inode.removeAclFeature(snapshotId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 444589e..82741ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -143,6 +142,12 @@ public class FSDirectory implements Closeable {
private final ReentrantReadWriteLock dirLock;
private final boolean isPermissionEnabled;
+ /**
+ * Support for ACLs is controlled by a configuration flag. If the
+ * configuration flag is false, then the NameNode will reject all
+ * ACL-related operations.
+ */
+ private final boolean aclsEnabled;
private final String fsOwnerShortUserName;
private final String supergroup;
private final INodeId inodeId;
@@ -204,7 +209,10 @@ public class FSDirectory implements Closeable {
this.supergroup = conf.get(
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-
+ this.aclsEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
+ DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
+ LOG.info("ACLs enabled? " + aclsEnabled);
int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
@@ -263,6 +271,9 @@ public class FSDirectory implements Closeable {
boolean isPermissionEnabled() {
return isPermissionEnabled;
}
+ boolean isAclsEnabled() {
+ return aclsEnabled;
+ }
int getLsLimit() {
return lsLimit;
@@ -1549,140 +1560,6 @@ public class FSDirectory implements Closeable {
return addINode(path, symlink) ? symlink : null;
}
- List<AclEntry> modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
- writeLock();
- try {
- return unprotectedModifyAclEntries(src, aclSpec);
- } finally {
- writeUnlock();
- }
- }
-
- private List<AclEntry> unprotectedModifyAclEntries(String src,
- List<AclEntry> aclSpec) throws IOException {
- assert hasWriteLock();
- INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
- INode inode = resolveLastINode(src, iip);
- int snapshotId = iip.getLatestSnapshotId();
- List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
- List<AclEntry> newAcl = AclTransformation.mergeAclEntries(existingAcl,
- aclSpec);
- AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
- return newAcl;
- }
-
- List<AclEntry> removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
- writeLock();
- try {
- return unprotectedRemoveAclEntries(src, aclSpec);
- } finally {
- writeUnlock();
- }
- }
-
- private List<AclEntry> unprotectedRemoveAclEntries(String src,
- List<AclEntry> aclSpec) throws IOException {
- assert hasWriteLock();
- INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
- INode inode = resolveLastINode(src, iip);
- int snapshotId = iip.getLatestSnapshotId();
- List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
- List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
- existingAcl, aclSpec);
- AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
- return newAcl;
- }
-
- List<AclEntry> removeDefaultAcl(String src) throws IOException {
- writeLock();
- try {
- return unprotectedRemoveDefaultAcl(src);
- } finally {
- writeUnlock();
- }
- }
-
- private List<AclEntry> unprotectedRemoveDefaultAcl(String src)
- throws IOException {
- assert hasWriteLock();
- INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
- INode inode = resolveLastINode(src, iip);
- int snapshotId = iip.getLatestSnapshotId();
- List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
- List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
- existingAcl);
- AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
- return newAcl;
- }
-
- void removeAcl(String src) throws IOException {
- writeLock();
- try {
- unprotectedRemoveAcl(src);
- } finally {
- writeUnlock();
- }
- }
-
- private void unprotectedRemoveAcl(String src) throws IOException {
- assert hasWriteLock();
- INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
- INode inode = resolveLastINode(src, iip);
- int snapshotId = iip.getLatestSnapshotId();
- AclStorage.removeINodeAcl(inode, snapshotId);
- }
-
- List<AclEntry> setAcl(String src, List<AclEntry> aclSpec) throws IOException {
- writeLock();
- try {
- return unprotectedSetAcl(src, aclSpec);
- } finally {
- writeUnlock();
- }
- }
-
- List<AclEntry> unprotectedSetAcl(String src, List<AclEntry> aclSpec)
- throws IOException {
- // ACL removal is logged to edits as OP_SET_ACL with an empty list.
- if (aclSpec.isEmpty()) {
- unprotectedRemoveAcl(src);
- return AclFeature.EMPTY_ENTRY_LIST;
- }
-
- assert hasWriteLock();
- INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
- INode inode = resolveLastINode(src, iip);
- int snapshotId = iip.getLatestSnapshotId();
- List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
- List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
- aclSpec);
- AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
- return newAcl;
- }
-
- AclStatus getAclStatus(String src) throws IOException {
- String srcs = normalizePath(src);
- readLock();
- try {
- // There is no real inode for the path ending in ".snapshot", so return a
- // non-null, unpopulated AclStatus. This is similar to getFileInfo.
- if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
- getINode4DotSnapshot(srcs) != null) {
- return new AclStatus.Builder().owner("").group("").build();
- }
- INodesInPath iip = getLastINodeInPath(srcs, true);
- INode inode = resolveLastINode(src, iip);
- int snapshotId = iip.getPathSnapshotId();
- List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
- return new AclStatus.Builder()
- .owner(inode.getUserName()).group(inode.getGroupName())
- .stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
- .addEntries(acl).build();
- } finally {
- readUnlock();
- }
- }
-
/**
* Removes a list of XAttrs from an inode at a path.
*
@@ -2065,9 +1942,10 @@ public class FSDirectory implements Closeable {
return null;
}
- private static INode resolveLastINode(String src, INodesInPath iip)
+ static INode resolveLastINode(String src, INodesInPath iip)
throws FileNotFoundException {
- INode inode = iip.getLastINode();
+ INode[] inodes = iip.getINodes();
+ INode inode = inodes[inodes.length - 1];
if (inode == null)
throw new FileNotFoundException("cannot find " + src);
return inode;
@@ -2246,8 +2124,8 @@ public class FSDirectory implements Closeable {
}
/** @return the {@link INodesInPath} containing only the last inode. */
- private INodesInPath getLastINodeInPath(String path, boolean resolveLink
- ) throws UnresolvedLinkException {
+ INodesInPath getLastINodeInPath(
+ String path, boolean resolveLink) throws UnresolvedLinkException {
return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1,
resolveLink);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index f60f142..d63545b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -823,7 +823,7 @@ public class FSEditLogLoader {
}
case OP_SET_ACL: {
SetAclOp setAclOp = (SetAclOp) op;
- fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries);
+ FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries);
break;
}
case OP_SET_XATTR: {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 982798f..e9ce78c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -532,7 +532,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final RetryCache retryCache;
- private final boolean aclsEnabled;
private final boolean xattrsEnabled;
private final int xattrMaxSize;
@@ -851,10 +850,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
- this.aclsEnabled = conf.getBoolean(
- DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
- DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
- LOG.info("ACLs enabled? " + aclsEnabled);
this.xattrsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY,
DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT);
@@ -7731,158 +7726,105 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return results;
}
- void modifyAclEntries(final String srcArg, List<AclEntry> aclSpec)
+ void modifyAclEntries(final String src, List<AclEntry> aclSpec)
throws IOException {
- String src = srcArg;
- checkAclsConfigFlag();
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ HdfsFileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- dir.checkOwner(pc, iip);
- List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
- getEditLog().logSetAcl(src, newAcl);
- resultingStat = getAuditFileInfo(src, false);
+ auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
} catch (AccessControlException e) {
- logAuditEvent(false, "modifyAclEntries", srcArg);
+ logAuditEvent(false, "modifyAclEntries", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
- logAuditEvent(true, "modifyAclEntries", srcArg, null, resultingStat);
+ logAuditEvent(true, "modifyAclEntries", src, null, auditStat);
}
- void removeAclEntries(final String srcArg, List<AclEntry> aclSpec)
+ void removeAclEntries(final String src, List<AclEntry> aclSpec)
throws IOException {
- String src = srcArg;
- checkAclsConfigFlag();
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ HdfsFileStatus auditStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- dir.checkOwner(pc, iip);
- List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
- getEditLog().logSetAcl(src, newAcl);
- resultingStat = getAuditFileInfo(src, false);
+ auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
} catch (AccessControlException e) {
- logAuditEvent(false, "removeAclEntries", srcArg);
+ logAuditEvent(false, "removeAclEntries", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
- logAuditEvent(true, "removeAclEntries", srcArg, null, resultingStat);
+ logAuditEvent(true, "removeAclEntries", src, null, auditStat);
}
- void removeDefaultAcl(final String srcArg) throws IOException {
- String src = srcArg;
- checkAclsConfigFlag();
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ void removeDefaultAcl(final String src) throws IOException {
+ HdfsFileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- dir.checkOwner(pc, iip);
- List<AclEntry> newAcl = dir.removeDefaultAcl(src);
- getEditLog().logSetAcl(src, newAcl);
- resultingStat = getAuditFileInfo(src, false);
+ auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
} catch (AccessControlException e) {
- logAuditEvent(false, "removeDefaultAcl", srcArg);
+ logAuditEvent(false, "removeDefaultAcl", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
- logAuditEvent(true, "removeDefaultAcl", srcArg, null, resultingStat);
+ logAuditEvent(true, "removeDefaultAcl", src, null, auditStat);
}
- void removeAcl(final String srcArg) throws IOException {
- String src = srcArg;
- checkAclsConfigFlag();
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ void removeAcl(final String src) throws IOException {
+ HdfsFileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL on " + src);
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- dir.checkOwner(pc, iip);
- dir.removeAcl(src);
- getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
- resultingStat = getAuditFileInfo(src, false);
+ auditStat = FSDirAclOp.removeAcl(dir, src);
} catch (AccessControlException e) {
- logAuditEvent(false, "removeAcl", srcArg);
+ logAuditEvent(false, "removeAcl", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
- logAuditEvent(true, "removeAcl", srcArg, null, resultingStat);
+ logAuditEvent(true, "removeAcl", src, null, auditStat);
}
- void setAcl(final String srcArg, List<AclEntry> aclSpec) throws IOException {
- String src = srcArg;
- checkAclsConfigFlag();
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ void setAcl(final String src, List<AclEntry> aclSpec) throws IOException {
+ HdfsFileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set ACL on " + src);
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- dir.checkOwner(pc, iip);
- List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
- getEditLog().logSetAcl(src, newAcl);
- resultingStat = getAuditFileInfo(src, false);
+ auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
} catch (AccessControlException e) {
- logAuditEvent(false, "setAcl", srcArg);
+ logAuditEvent(false, "setAcl", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
- logAuditEvent(true, "setAcl", srcArg, null, resultingStat);
+ logAuditEvent(true, "setAcl", src, null, auditStat);
}
AclStatus getAclStatus(String src) throws IOException {
- checkAclsConfigFlag();
- FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean success = false;
readLock();
try {
checkOperation(OperationCategory.READ);
- src = dir.resolvePath(pc, src, pathComponents);
- INodesInPath iip = dir.getINodesInPath(src, true);
- if (isPermissionEnabled) {
- dir.checkPermission(pc, iip, false, null, null, null, null);
- }
- final AclStatus ret = dir.getAclStatus(src);
+ final AclStatus ret = FSDirAclOp.getAclStatus(dir, src);
success = true;
return ret;
} finally {
@@ -8369,15 +8311,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
- private void checkAclsConfigFlag() throws AclException {
- if (!aclsEnabled) {
- throw new AclException(String.format(
- "The ACL operation has been rejected. "
- + "Support for ACLs has been disabled by setting %s to false.",
- DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
- }
- }
-
private void checkXAttrsConfigFlag() throws IOException {
if (!xattrsEnabled) {
throw new IOException(String.format(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9297f980/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
index c91cd75..0c119bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
@@ -18,21 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
-import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +38,22 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+
/**
* Tests for the {@link AuditLogger} custom audit logging interface.
*/
@@ -208,27 +209,10 @@ public class TestAuditLogger {
try {
cluster.waitClusterUp();
final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
- // Set up mock FSDirectory to test FSN audit logging during failure
+
final FSDirectory mockedDir = Mockito.spy(dir);
- Mockito.doThrow(new AccessControlException("mock setAcl exception")).
- when(mockedDir).
- setAcl(anyString(), anyListOf(AclEntry.class));
- Mockito.doThrow(new AccessControlException("mock getAclStatus exception")).
- when(mockedDir).
- getAclStatus(anyString());
- Mockito.doThrow(new AccessControlException("mock removeAcl exception")).
- when(mockedDir).
- removeAcl(anyString());
- Mockito.doThrow(new AccessControlException("mock removeDefaultAcl exception")).
- when(mockedDir).
- removeDefaultAcl(anyString());
- Mockito.doThrow(new AccessControlException("mock removeAclEntries exception")).
- when(mockedDir).
- removeAclEntries(anyString(), anyListOf(AclEntry.class));
- Mockito.doThrow(new AccessControlException("mock modifyAclEntries exception")).
- when(mockedDir).
- modifyAclEntries(anyString(), anyListOf(AclEntry.class));
- // Replace the FSD with the mock FSD.
+ AccessControlException ex = new AccessControlException();
+ doThrow(ex).when(mockedDir).getPermissionChecker();
cluster.getNamesystem().setFSDirectory(mockedDir);
assertTrue(DummyAuditLogger.initialized);
DummyAuditLogger.resetLogCount();
@@ -239,39 +223,28 @@ public class TestAuditLogger {
try {
fs.getAclStatus(p);
- } catch (AccessControlException e) {
- assertExceptionContains("mock getAclStatus exception", e);
- }
+ } catch (AccessControlException ignored) {}
try {
fs.setAcl(p, acls);
- } catch (AccessControlException e) {
- assertExceptionContains("mock setAcl exception", e);
- }
+ } catch (AccessControlException ignored) {}
try {
fs.removeAcl(p);
- } catch (AccessControlException e) {
- assertExceptionContains("mock removeAcl exception", e);
- }
+ } catch (AccessControlException ignored) {}
try {
fs.removeDefaultAcl(p);
- } catch (AccessControlException e) {
- assertExceptionContains("mock removeDefaultAcl exception", e);
- }
+ } catch (AccessControlException ignored) {}
try {
fs.removeAclEntries(p, acls);
- } catch (AccessControlException e) {
- assertExceptionContains("mock removeAclEntries exception", e);
- }
+ } catch (AccessControlException ignored) {}
try {
fs.modifyAclEntries(p, acls);
- } catch (AccessControlException e) {
- assertExceptionContains("mock modifyAclEntries exception", e);
- }
+ } catch (AccessControlException ignored) {}
+
assertEquals(6, DummyAuditLogger.logCount);
assertEquals(6, DummyAuditLogger.unsuccessfulCount);
} finally {