You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2014/12/24 20:35:58 UTC

[44/50] hadoop git commit: HDFS-7456. De-duplicate AclFeature instances with same AclEntries do reduce memory footprint of NameNode (Contributed by Vinayakumar B)

HDFS-7456. De-duplicate AclFeature instances with same AclEntries do reduce memory footprint of NameNode (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7efc73c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7efc73c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7efc73c9

Branch: refs/heads/HDFS-EC
Commit: 7efc73c9fd2842acc52ad27bc98e33e19c0d0b5e
Parents: 7b2c667
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Dec 23 12:35:53 2014 +0530
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Wed Dec 24 11:22:19 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/server/namenode/AclFeature.java |  37 +++-
 .../hadoop/hdfs/server/namenode/AclStorage.java |  30 ++-
 .../hdfs/server/namenode/INodeAttributes.java   |   9 +-
 .../hdfs/server/namenode/INodeDirectory.java    |   9 +
 .../hadoop/hdfs/server/namenode/INodeFile.java  |   3 +
 .../namenode/INodeWithAdditionalFields.java     |   5 +-
 .../snapshot/DirectoryWithSnapshotFeature.java  |   5 +
 .../snapshot/FileWithSnapshotFeature.java       |   6 +
 .../hadoop/hdfs/util/ReferenceCountMap.java     | 118 +++++++++++
 .../hdfs/server/namenode/FSAclBaseTest.java     | 207 ++++++++++++++++++-
 .../server/namenode/TestFileContextAcl.java     |   6 +-
 .../hdfs/server/namenode/TestNameNodeAcl.java   |   6 +-
 .../namenode/snapshot/TestAclWithSnapshot.java  | 189 ++++++++++++++++-
 .../apache/hadoop/hdfs/web/TestWebHDFSAcl.java  |   6 +-
 15 files changed, 614 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 07a78c2..66b3a8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -630,6 +630,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7560. ACLs removed by removeDefaultAcl() will be back after NameNode
     restart/failover. (Vinayakumar B via cnauroth)
 
+    HDFS-7456. De-duplicate AclFeature instances with same AclEntries do reduce
+    memory footprint of NameNode (vinayakumarb)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
index e097b05..97d4759 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
@@ -18,8 +18,11 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.hdfs.util.ReferenceCountMap.ReferenceCounter;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -28,9 +31,10 @@ import com.google.common.collect.ImmutableList;
  * Feature that represents the ACLs of the inode.
  */
 @InterfaceAudience.Private
-public class AclFeature implements INode.Feature {
+public class AclFeature implements INode.Feature, ReferenceCounter {
   public static final ImmutableList<AclEntry> EMPTY_ENTRY_LIST =
     ImmutableList.of();
+  private int refCount = 0;
 
   private final int [] entries;
 
@@ -56,4 +60,35 @@ public class AclFeature implements INode.Feature {
         "Invalid position for AclEntry");
     return entries[pos];
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null) {
+      return false;
+    }
+    if (getClass() != o.getClass()) {
+      return false;
+    }
+    return Arrays.equals(entries, ((AclFeature) o).entries);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(entries);
+  }
+
+  @Override
+  public int getRefCount() {
+    return refCount;
+  }
+
+  @Override
+  public int incrementAndGetRefCount() {
+    return ++refCount;
+  }
+
+  @Override
+  public int decrementAndGetRefCount() {
+    return (refCount > 0) ? --refCount : 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
index a866046..4f6ce3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.ScopedAclEntries;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.util.ReferenceCountMap;
 
 /**
  * AclStorage contains utility methods that define how ACL data is stored in the
@@ -61,7 +62,10 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
  * {@link AclTransformation}.
  */
 @InterfaceAudience.Private
-final class AclStorage {
+public final class AclStorage {
+
+  private final static ReferenceCountMap<AclFeature> UNIQUE_ACL_FEATURES =
+      new ReferenceCountMap<AclFeature>();
 
   /**
    * If a default ACL is defined on a parent directory, then copies that default
@@ -359,4 +363,28 @@ final class AclStorage {
       accessEntries.get(2).getPermission(),
       existingPerm.getStickyBit());
   }
+
+  @VisibleForTesting
+  public static ReferenceCountMap<AclFeature> getUniqueAclFeatures() {
+    return UNIQUE_ACL_FEATURES;
+  }
+
+  /**
+   * Add reference for the said AclFeature
+   * 
+   * @param aclFeature
+   * @return Referenced AclFeature
+   */
+  public static AclFeature addAclFeature(AclFeature aclFeature) {
+    return UNIQUE_ACL_FEATURES.put(aclFeature);
+  }
+
+  /**
+   * Remove reference to the AclFeature
+   * 
+   * @param aclFeature
+   */
+  public static void removeAclFeature(AclFeature aclFeature) {
+    UNIQUE_ACL_FEATURES.remove(aclFeature);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
index 8b0a5f0..0f76b68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
@@ -75,6 +75,9 @@ public interface INodeAttributes {
         XAttrFeature xAttrFeature) {
       this.name = name;
       this.permission = PermissionStatusFormat.toLong(permissions);
+      if (aclFeature != null) {
+        aclFeature = AclStorage.addAclFeature(aclFeature);
+      }
       this.aclFeature = aclFeature;
       this.modificationTime = modificationTime;
       this.accessTime = accessTime;
@@ -84,7 +87,11 @@ public interface INodeAttributes {
     SnapshotCopy(INode inode) {
       this.name = inode.getLocalNameBytes();
       this.permission = inode.getPermissionLong();
-      this.aclFeature = inode.getAclFeature();
+      if (inode.getAclFeature() != null) {
+        aclFeature = AclStorage.addAclFeature(inode.getAclFeature());
+      } else {
+        aclFeature = null;
+      }
       this.modificationTime = inode.getModificationTime();
       this.accessTime = inode.getAccessTime();
       this.xAttrFeature = inode.getXAttrFeature();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 797a62c..9381426 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -93,6 +93,12 @@ public class INodeDirectory extends INodeWithAdditionalFields
       }
     }
     this.features = featuresToCopy;
+    AclFeature aclFeature = getFeature(AclFeature.class);
+    if (aclFeature != null) {
+      // for the de-duplication of AclFeature
+      removeFeature(aclFeature);
+      addFeature(AclStorage.addAclFeature(aclFeature));
+    }
   }
 
   /** @return true unconditionally. */
@@ -764,6 +770,9 @@ public class INodeDirectory extends INodeWithAdditionalFields
     for (INode child : getChildrenList(Snapshot.CURRENT_STATE_ID)) {
       child.destroyAndCollectBlocks(collectedBlocks, removedINodes);
     }
+    if (getAclFeature() != null) {
+      AclStorage.removeAclFeature(getAclFeature());
+    }
     clear();
     removedINodes.add(this);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index ccf3df1..b811f12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -504,6 +504,9 @@ public class INodeFile extends INodeWithAdditionalFields
       }
     }
     setBlocks(null);
+    if (getAclFeature() != null) {
+      AclStorage.removeAclFeature(getAclFeature());
+    }
     clear();
     removedINodes.add(this);
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
index 93da052..5ec0b55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
@@ -219,7 +219,7 @@ public abstract class INodeWithAdditionalFields extends INode
   }
 
   @Override
-  final AclFeature getAclFeature(int snapshotId) {
+  public final AclFeature getAclFeature(int snapshotId) {
     if (snapshotId != Snapshot.CURRENT_STATE_ID) {
       return getSnapshotINode(snapshotId).getAclFeature();
     }
@@ -330,6 +330,7 @@ public abstract class INodeWithAdditionalFields extends INode
     AclFeature f = getAclFeature();
     Preconditions.checkNotNull(f);
     removeFeature(f);
+    AclStorage.removeAclFeature(f);
   }
 
   public void addAclFeature(AclFeature f) {
@@ -337,7 +338,7 @@ public abstract class INodeWithAdditionalFields extends INode
     if (f1 != null)
       throw new IllegalStateException("Duplicated ACLFeature");
 
-    addFeature(f);
+    addFeature(AclStorage.addAclFeature(f));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index 9c9d435..145d4f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.AclStorage;
 import org.apache.hadoop.hdfs.server.namenode.Content;
 import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
@@ -317,6 +318,10 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
       // this diff has been deleted
       Quota.Counts counts = Quota.Counts.newInstance();
       counts.add(diff.destroyDeletedList(collectedBlocks, removedINodes));
+      INodeDirectoryAttributes snapshotINode = getSnapshotINode();
+      if (snapshotINode != null && snapshotINode.getAclFeature() != null) {
+        AclStorage.removeAclFeature(snapshotINode.getAclFeature());
+      }
       return counts;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
index 9e57b15..e3bf349 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
@@ -22,8 +22,10 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.AclStorage;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
@@ -148,6 +150,10 @@ public class FileWithSnapshotFeature implements INode.Feature {
       } else if (replication > currentRepl) {  
         oldDiskspace = oldDiskspace / file.getBlockReplication() * replication;
       }
+      AclFeature aclFeature = removed.getSnapshotINode().getAclFeature();
+      if (aclFeature != null) {
+        AclStorage.removeAclFeature(aclFeature);
+      }
     }
     
     collectBlocksAndClear(file, collectedBlocks, removedINodes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ReferenceCountMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ReferenceCountMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ReferenceCountMap.java
new file mode 100644
index 0000000..5b29c43
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ReferenceCountMap.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Class for de-duplication of instances. <br>
+ * Hold the references count to a single instance. If there are no references
+ * then the entry will be removed.<br>
+ * Type E should implement {@link ReferenceCounter}<br>
+ * Note: This class is NOT thread-safe.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReferenceCountMap<E extends ReferenceCountMap.ReferenceCounter> {
+
+  private Map<E, E> referenceMap = new HashMap<E, E>();
+
+  /**
+   * Add the reference. If the instance already present, just increase the
+   * reference count.
+   * 
+   * @param key Key to put in reference map
+   * @return Referenced instance
+   */
+  public E put(E key) {
+    E value = referenceMap.get(key);
+    if (value == null) {
+      value = key;
+      referenceMap.put(key, value);
+    }
+    value.incrementAndGetRefCount();
+    return value;
+  }
+
+  /**
+   * Delete the reference. Decrease the reference count for the instance, if
+   * any. On all references removal delete the instance from the map.
+   * 
+   * @param key Key to remove the reference.
+   */
+  public void remove(E key) {
+    E value = referenceMap.get(key);
+    if (value != null && value.decrementAndGetRefCount() == 0) {
+      referenceMap.remove(key);
+    }
+  }
+
+  /**
+   * Get entries in the reference Map.
+   * 
+   * @return
+   */
+  @VisibleForTesting
+  public ImmutableList<E> getEntries() {
+    return new ImmutableList.Builder<E>().addAll(referenceMap.keySet()).build();
+  }
+
+  /**
+   * Get the reference count for the key
+   */
+  public long getReferenceCount(E key) {
+    ReferenceCounter counter = referenceMap.get(key);
+    if (counter != null) {
+      return counter.getRefCount();
+    }
+    return 0;
+  }
+
+  /**
+   * Get the number of unique elements
+   */
+  public int getUniqueElementsSize() {
+    return referenceMap.size();
+  }
+
+  /**
+   * Clear the contents
+   */
+  @VisibleForTesting
+  public void clear() {
+    referenceMap.clear();
+  }
+
+  /**
+   * Interface for the reference count holder
+   */
+  public static interface ReferenceCounter {
+    public int getRefCount();
+
+    public int incrementAndGetRefCount();
+
+    public int decrementAndGetRefCount();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
index 528dfff..f481bc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.*;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -79,6 +82,12 @@ public abstract class FSAclBaseTest {
 
   private FileSystem fs, fsAsBruce, fsAsDiana, fsAsSupergroupMember, fsAsBob;
 
+  protected static void startCluster() throws IOException {
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+  }
+
   @AfterClass
   public static void shutdown() {
     if (cluster != null) {
@@ -1387,6 +1396,187 @@ public abstract class FSAclBaseTest {
   }
 
   /**
+   * Verify the de-duplication of AclFeatures with same entries.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDeDuplication() throws Exception {
+    // This test needs to verify the count of the references which is held by
+    // static data structure. So shutting down entire cluster to get the fresh
+    // data.
+    shutdown();
+    AclStorage.getUniqueAclFeatures().clear();
+    startCluster();
+    setUp();
+    int currentSize = 0;
+    Path p1 = new Path("/testDeduplication");
+    {
+      // unique default AclEntries for this test
+      List<AclEntry> aclSpec = Lists.newArrayList(
+          aclEntry(DEFAULT, USER, "testdeduplicateuser", ALL),
+          aclEntry(DEFAULT, GROUP, "testdeduplicategroup", ALL));
+      fs.mkdirs(p1);
+      fs.modifyAclEntries(p1, aclSpec);
+      assertEquals("One more ACL feature should be unique", currentSize + 1,
+          AclStorage.getUniqueAclFeatures().getUniqueElementsSize());
+      currentSize++;
+    }
+    Path child1 = new Path(p1, "child1");
+    AclFeature child1AclFeature;
+    {
+      // new child dir should copy entries from its parent.
+      fs.mkdirs(child1);
+      assertEquals("One more ACL feature should be unique", currentSize + 1,
+          AclStorage.getUniqueAclFeatures().getUniqueElementsSize());
+      child1AclFeature = getAclFeature(child1, cluster);
+      assertEquals("Reference count should be 1", 1,
+          child1AclFeature.getRefCount());
+      currentSize++;
+    }
+    Path child2 = new Path(p1, "child2");
+    {
+      // new child dir should copy entries from its parent. But all entries are
+      // same as its sibling without any more acl changes.
+      fs.mkdirs(child2);
+      assertEquals("existing AclFeature should be re-used", currentSize,
+          AclStorage.getUniqueAclFeatures().getUniqueElementsSize());
+      AclFeature child2AclFeature = getAclFeature(child1, cluster);
+      assertSame("Same Aclfeature should be re-used", child1AclFeature,
+          child2AclFeature);
+      assertEquals("Reference count should be 2", 2,
+          child2AclFeature.getRefCount());
+    }
+    {
+      // modification of ACL on should decrement the original reference count
+      // and increase new one.
+      List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(ACCESS, USER,
+          "user1", ALL));
+      fs.modifyAclEntries(child1, aclSpec);
+      AclFeature modifiedAclFeature = getAclFeature(child1, cluster);
+      assertEquals("Old Reference count should be 1", 1,
+          child1AclFeature.getRefCount());
+      assertEquals("New Reference count should be 1", 1,
+          modifiedAclFeature.getRefCount());
+
+      // removing the new added ACL entry should refer to old ACLfeature
+      AclEntry aclEntry = new AclEntry.Builder().setScope(ACCESS).setType(USER)
+          .setName("user1").build();
+      fs.removeAclEntries(child1, Lists.newArrayList(aclEntry));
+      assertEquals("Old Reference count should be 2 again", 2,
+          child1AclFeature.getRefCount());
+      assertEquals("New Reference count should be 0", 0,
+          modifiedAclFeature.getRefCount());
+    }
+    {
+      // verify the reference count on deletion of Acls
+      fs.removeAcl(child2);
+      assertEquals("Reference count should be 1", 1,
+          child1AclFeature.getRefCount());
+    }
+    {
+      // verify the reference count on deletion of dir with ACL
+      fs.delete(child1, true);
+      assertEquals("Reference count should be 0", 0,
+          child1AclFeature.getRefCount());
+    }
+
+    Path file1 = new Path(p1, "file1");
+    Path file2 = new Path(p1, "file2");
+    AclFeature fileAclFeature;
+    {
+      // Using same reference on creation of file
+      fs.create(file1).close();
+      fileAclFeature = getAclFeature(file1, cluster);
+      assertEquals("Reference count should be 1", 1,
+          fileAclFeature.getRefCount());
+      fs.create(file2).close();
+      assertEquals("Reference count should be 2", 2,
+          fileAclFeature.getRefCount());
+    }
+    {
+      // modifying ACLs on file should decrease the reference count on old
+      // instance and increase on the new instance
+      List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(ACCESS, USER,
+          "user1", ALL));
+      // adding new ACL entry
+      fs.modifyAclEntries(file1, aclSpec);
+      AclFeature modifiedFileAcl = getAclFeature(file1, cluster);
+      assertEquals("Old Reference count should be 1", 1,
+          fileAclFeature.getRefCount());
+      assertEquals("New Reference count should be 1", 1,
+          modifiedFileAcl.getRefCount());
+
+      // removing the new added ACL entry should refer to old ACLfeature
+      AclEntry aclEntry = new AclEntry.Builder().setScope(ACCESS).setType(USER)
+          .setName("user1").build();
+      fs.removeAclEntries(file1, Lists.newArrayList(aclEntry));
+      assertEquals("Old Reference count should be 2", 2,
+          fileAclFeature.getRefCount());
+      assertEquals("New Reference count should be 0", 0,
+          modifiedFileAcl.getRefCount());
+    }
+    {
+      // reference count should be decreased on deletion of files with ACLs
+      fs.delete(file2, true);
+      assertEquals("Reference count should be decreased on delete of the file",
+          1, fileAclFeature.getRefCount());
+      fs.delete(file1, true);
+      assertEquals("Reference count should be decreased on delete of the file",
+          0, fileAclFeature.getRefCount());
+
+      // On reference count reaches 0 instance should be removed from map
+      fs.create(file1).close();
+      AclFeature newFileAclFeature = getAclFeature(file1, cluster);
+      assertNotSame("Instance should be different on reference count 0",
+          fileAclFeature, newFileAclFeature);
+      fileAclFeature = newFileAclFeature;
+    }
+    Map<AclFeature, Integer> restartRefCounter = new HashMap<>();
+    // Restart the Namenode to check the references.
+    // Here reference counts will not be same after restart because, while
+    // shutting down namenode will not call any removal of AclFeature.
+    // However this is applicable only in case of tests as in real-cluster JVM
+    // itself will be new.
+    List<AclFeature> entriesBeforeRestart = AclStorage.getUniqueAclFeatures()
+        .getEntries();
+    {
+      //restart by loading edits
+      for (AclFeature aclFeature : entriesBeforeRestart) {
+        restartRefCounter.put(aclFeature, aclFeature.getRefCount());
+      }
+      cluster.restartNameNode(true);
+      List<AclFeature> entriesAfterRestart = AclStorage.getUniqueAclFeatures()
+          .getEntries();
+      assertEquals("Entries before and after should be same",
+          entriesBeforeRestart, entriesAfterRestart);
+      for (AclFeature aclFeature : entriesAfterRestart) {
+        int before = restartRefCounter.get(aclFeature);
+        assertEquals("ReferenceCount After Restart should be doubled",
+            before * 2, aclFeature.getRefCount());
+      }
+    }
+    {
+      //restart by loading fsimage
+      cluster.getNameNodeRpc()
+          .setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
+      cluster.getNameNodeRpc().saveNamespace();
+      cluster.getNameNodeRpc()
+          .setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
+      cluster.restartNameNode(true);
+      List<AclFeature> entriesAfterRestart = AclStorage.getUniqueAclFeatures()
+          .getEntries();
+      assertEquals("Entries before and after should be same",
+          entriesBeforeRestart, entriesAfterRestart);
+      for (AclFeature aclFeature : entriesAfterRestart) {
+        int before = restartRefCounter.get(aclFeature);
+        assertEquals("ReferenceCount After 2 Restarts should be tripled",
+            before * 3, aclFeature.getRefCount());
+      }
+    }
+  }
+
+  /**
    * Creates a FileSystem for the super-user.
    *
    * @return FileSystem for super-user
@@ -1457,10 +1647,7 @@ public abstract class FSAclBaseTest {
    */
   private static void assertAclFeature(Path pathToCheck,
       boolean expectAclFeature) throws IOException {
-    INode inode = cluster.getNamesystem().getFSDirectory()
-      .getINode(pathToCheck.toUri().getPath(), false);
-    assertNotNull(inode);
-    AclFeature aclFeature = inode.getAclFeature();
+    AclFeature aclFeature = getAclFeature(pathToCheck, cluster);
     if (expectAclFeature) {
       assertNotNull(aclFeature);
       // Intentionally capturing a reference to the entries, not using nested
@@ -1475,6 +1662,18 @@ public abstract class FSAclBaseTest {
   }
 
   /**
+   * Get AclFeature for the path
+   */
+  public static AclFeature getAclFeature(Path pathToCheck,
+      MiniDFSCluster cluster) throws IOException {
+    INode inode = cluster.getNamesystem().getFSDirectory()
+        .getINode(pathToCheck.toUri().getPath(), false);
+    assertNotNull(inode);
+    AclFeature aclFeature = inode.getAclFeature();
+    return aclFeature;
+  }
+
+  /**
    * Asserts the value of the FsPermission bits on the inode of the test path.
    *
    * @param perm short expected permission bits

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileContextAcl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileContextAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileContextAcl.java
index 51baa38..f9a6889 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileContextAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileContextAcl.java
@@ -27,9 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.BeforeClass;
 
 /**
@@ -40,9 +38,7 @@ public class TestFileContextAcl extends FSAclBaseTest {
   @BeforeClass
   public static void init() throws Exception {
     conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
+    startCluster();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java
index 7fe4559..6a36c98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.BeforeClass;
 
 /**
@@ -31,8 +29,6 @@ public class TestNameNodeAcl extends FSAclBaseTest {
   @BeforeClass
   public static void init() throws Exception {
     conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
+    startCluster();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
index 3be1d36..fc9cd04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.fs.permission.AclEntryType.*;
 import static org.apache.hadoop.fs.permission.FsAction.*;
 import static org.junit.Assert.*;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,13 +40,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
+import org.apache.hadoop.hdfs.server.namenode.AclStorage;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
+import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
-
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -657,6 +660,190 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] { }, returned);
   }
 
+  @Test
+  public void testDeDuplication() throws Exception {
+    int startSize = AclStorage.getUniqueAclFeatures().getUniqueElementsSize();
+    // unique default AclEntries for this test
+    List<AclEntry> aclSpec = Lists.newArrayList(
+        aclEntry(ACCESS, USER, "testdeduplicateuser", ALL),
+        aclEntry(ACCESS, GROUP, "testdeduplicategroup", ALL));
+    hdfs.mkdirs(path);
+    hdfs.modifyAclEntries(path, aclSpec);
+    assertEquals("One more ACL feature should be unique", startSize + 1,
+        AclStorage.getUniqueAclFeatures().getUniqueElementsSize());
+    Path subdir = new Path(path, "sub-dir");
+    hdfs.mkdirs(subdir);
+    Path file = new Path(path, "file");
+    hdfs.create(file).close();
+    AclFeature aclFeature;
+    {
+      // create the snapshot with root directory having ACLs should refer to
+      // same ACLFeature without incrementing the reference count
+      aclFeature = FSAclBaseTest.getAclFeature(path, cluster);
+      assertEquals("Reference count should be one before snapshot", 1,
+          aclFeature.getRefCount());
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      AclFeature snapshotAclFeature = FSAclBaseTest.getAclFeature(snapshotPath,
+          cluster);
+      assertSame(aclFeature, snapshotAclFeature);
+      assertEquals("Reference count should be increased", 2,
+          snapshotAclFeature.getRefCount());
+    }
+    {
+      // deleting the snapshot with root directory having ACLs should not alter
+      // the reference count of the ACLFeature
+      deleteSnapshotWithAclAndVerify(aclFeature, path, startSize);
+    }
+    {
+      hdfs.modifyAclEntries(subdir, aclSpec);
+      aclFeature = FSAclBaseTest.getAclFeature(subdir, cluster);
+      assertEquals("Reference count should be 1", 1, aclFeature.getRefCount());
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      Path subdirInSnapshot = new Path(snapshotPath, "sub-dir");
+      AclFeature snapshotAcl = FSAclBaseTest.getAclFeature(subdirInSnapshot,
+          cluster);
+      assertSame(aclFeature, snapshotAcl);
+      assertEquals("Reference count should remain same", 1,
+          aclFeature.getRefCount());
+
+      // Delete the snapshot with sub-directory containing the ACLs should not
+      // alter the reference count for AclFeature
+      deleteSnapshotWithAclAndVerify(aclFeature, subdir, startSize);
+    }
+    {
+      hdfs.modifyAclEntries(file, aclSpec);
+      aclFeature = FSAclBaseTest.getAclFeature(file, cluster);
+      assertEquals("Reference count should be 1", 1, aclFeature.getRefCount());
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      Path fileInSnapshot = new Path(snapshotPath, file.getName());
+      AclFeature snapshotAcl = FSAclBaseTest.getAclFeature(fileInSnapshot,
+          cluster);
+      assertSame(aclFeature, snapshotAcl);
+      assertEquals("Reference count should remain same", 1,
+          aclFeature.getRefCount());
+
+      // Delete the snapshot with contained file having ACLs should not
+      // alter the reference count for AclFeature
+      deleteSnapshotWithAclAndVerify(aclFeature, file, startSize);
+    }
+    {
+      // Modifying the ACLs of root directory of the snapshot should refer new
+      // AclFeature. And old AclFeature should be referenced by snapshot
+      hdfs.modifyAclEntries(path, aclSpec);
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      AclFeature snapshotAcl = FSAclBaseTest.getAclFeature(snapshotPath,
+          cluster);
+      aclFeature = FSAclBaseTest.getAclFeature(path, cluster);
+      assertEquals("Before modification same ACL should be referenced twice", 2,
+          aclFeature.getRefCount());
+      List<AclEntry> newAcl = Lists.newArrayList(aclEntry(ACCESS, USER,
+          "testNewUser", ALL));
+      hdfs.modifyAclEntries(path, newAcl);
+      aclFeature = FSAclBaseTest.getAclFeature(path, cluster);
+      AclFeature snapshotAclPostModification = FSAclBaseTest.getAclFeature(
+          snapshotPath, cluster);
+      assertSame(snapshotAcl, snapshotAclPostModification);
+      assertNotSame(aclFeature, snapshotAclPostModification);
+      assertEquals("Old ACL feature reference count should be same", 1,
+          snapshotAcl.getRefCount());
+      assertEquals("New ACL feature reference should be used", 1,
+          aclFeature.getRefCount());
+      deleteSnapshotWithAclAndVerify(aclFeature, path, startSize);
+    }
+    {
+      // Modifying the ACLs of sub directory of the snapshot root should refer
+      // new AclFeature. And old AclFeature should be referenced by snapshot
+      hdfs.modifyAclEntries(subdir, aclSpec);
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      Path subdirInSnapshot = new Path(snapshotPath, "sub-dir");
+      AclFeature snapshotAclFeature = FSAclBaseTest.getAclFeature(
+          subdirInSnapshot, cluster);
+      List<AclEntry> newAcl = Lists.newArrayList(aclEntry(ACCESS, USER,
+          "testNewUser", ALL));
+      hdfs.modifyAclEntries(subdir, newAcl);
+      aclFeature = FSAclBaseTest.getAclFeature(subdir, cluster);
+      assertNotSame(aclFeature, snapshotAclFeature);
+      assertEquals("Reference count should remain same", 1,
+          snapshotAclFeature.getRefCount());
+      assertEquals("New AclFeature should be used", 1, aclFeature.getRefCount());
+
+      deleteSnapshotWithAclAndVerify(aclFeature, subdir, startSize);
+    }
+    {
+      // Modifying the ACLs of file inside the snapshot root should refer new
+      // AclFeature. And old AclFeature should be referenced by snapshot
+      hdfs.modifyAclEntries(file, aclSpec);
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      Path fileInSnapshot = new Path(snapshotPath, file.getName());
+      AclFeature snapshotAclFeature = FSAclBaseTest.getAclFeature(
+          fileInSnapshot, cluster);
+      List<AclEntry> newAcl = Lists.newArrayList(aclEntry(ACCESS, USER,
+          "testNewUser", ALL));
+      hdfs.modifyAclEntries(file, newAcl);
+      aclFeature = FSAclBaseTest.getAclFeature(file, cluster);
+      assertNotSame(aclFeature, snapshotAclFeature);
+      assertEquals("Reference count should remain same", 1,
+          snapshotAclFeature.getRefCount());
+      deleteSnapshotWithAclAndVerify(aclFeature, file, startSize);
+    }
+    {
+      // deleting the original directory containing dirs and files with ACLs
+      // with snapshot
+      hdfs.delete(path, true);
+      Path dir = new Path(subdir, "dir");
+      hdfs.mkdirs(dir);
+      hdfs.modifyAclEntries(dir, aclSpec);
+      file = new Path(subdir, "file");
+      hdfs.create(file).close();
+      aclSpec.add(aclEntry(ACCESS, USER, "testNewUser", ALL));
+      hdfs.modifyAclEntries(file, aclSpec);
+      AclFeature fileAcl = FSAclBaseTest.getAclFeature(file, cluster);
+      AclFeature dirAcl = FSAclBaseTest.getAclFeature(dir, cluster);
+      Path snapshotPath = SnapshotTestHelper.createSnapshot(hdfs, path,
+          snapshotName);
+      Path dirInSnapshot = new Path(snapshotPath, "sub-dir/dir");
+      AclFeature snapshotDirAclFeature = FSAclBaseTest.getAclFeature(
+          dirInSnapshot, cluster);
+      Path fileInSnapshot = new Path(snapshotPath, "sub-dir/file");
+      AclFeature snapshotFileAclFeature = FSAclBaseTest.getAclFeature(
+          fileInSnapshot, cluster);
+      assertSame(fileAcl, snapshotFileAclFeature);
+      assertSame(dirAcl, snapshotDirAclFeature);
+      hdfs.delete(subdir, true);
+      assertEquals(
+          "Original ACLs references should be maintained for snapshot", 1,
+          snapshotFileAclFeature.getRefCount());
+      assertEquals(
+          "Original ACLs references should be maintained for snapshot", 1,
+          snapshotDirAclFeature.getRefCount());
+      hdfs.deleteSnapshot(path, snapshotName);
+      assertEquals("ACLs should be deleted from snapshot", startSize, AclStorage
+          .getUniqueAclFeatures().getUniqueElementsSize());
+    }
+  }
+
+  private void deleteSnapshotWithAclAndVerify(AclFeature aclFeature,
+      Path pathToCheckAcl, int totalAclFeatures) throws IOException {
+    hdfs.deleteSnapshot(path, snapshotName);
+    AclFeature afterDeleteAclFeature = FSAclBaseTest.getAclFeature(
+        pathToCheckAcl, cluster);
+    assertSame(aclFeature, afterDeleteAclFeature);
+    assertEquals("Reference count should remain same"
+        + " even after deletion of snapshot", 1,
+        afterDeleteAclFeature.getRefCount());
+
+    hdfs.removeAcl(pathToCheckAcl);
+    assertEquals("Reference count should be 0", 0, aclFeature.getRefCount());
+    assertEquals("Unique ACL features should remain same", totalAclFeatures,
+        AclStorage.getUniqueAclFeatures().getUniqueElementsSize());
+  }
+
   /**
    * Asserts that permission is denied to the given fs/user for the given
    * directory.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7efc73c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSAcl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSAcl.java
index 2b14fe1..6b44b26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSAcl.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.web;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -34,9 +32,7 @@ public class TestWebHDFSAcl extends FSAclBaseTest {
   @BeforeClass
   public static void init() throws Exception {
     conf = WebHdfsTestUtil.createConf();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
+    startCluster();
   }
 
   /**