You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by me...@apache.org on 2019/06/24 03:45:10 UTC

[hbase] branch branch-2 updated: HBASE-21995 Add a coprocessor to set HDFS ACL for hbase granted user

This is an automated email from the ASF dual-hosted git repository.

meiyi pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new d0cb77c  HBASE-21995 Add a coprocessor to set HDFS ACL for hbase granted user
d0cb77c is described below

commit d0cb77ce8a911c7dd216d2527ebf0f056458a82e
Author: meiyi <my...@gamil.com>
AuthorDate: Mon Jun 24 11:33:25 2019 +0800

    HBASE-21995 Add a coprocessor to set HDFS ACL for hbase granted user
---
 .../hadoop/hbase/coprocessor/MasterObserver.java   |  10 +
 .../java/org/apache/hadoop/hbase/io/FileLink.java  |  56 +-
 .../hadoop/hbase/master/MasterCoprocessorHost.java |  10 +
 .../hbase/master/snapshot/TakeSnapshotHandler.java |   6 +-
 .../hbase/security/access/PermissionStorage.java   |  11 +-
 .../access/SnapshotScannerHDFSAclController.java   | 673 +++++++++++++++++++
 .../access/SnapshotScannerHDFSAclHelper.java       | 737 +++++++++++++++++++++
 .../TestSnapshotScannerHDFSAclController.java      | 688 +++++++++++++++++++
 8 files changed, 2175 insertions(+), 16 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index f979429..69abfc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -749,6 +749,16 @@ public interface MasterObserver {
       throws IOException {}
 
   /**
+   * Called after the snapshot operation has been completed.
+   * @param ctx the environment to interact with the framework and master
+   * @param snapshot the SnapshotDescriptor for the snapshot
+   * @param tableDescriptor the TableDescriptor of the table to snapshot
+   */
+  default void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
+  }
+
+  /**
    * Called before listSnapshots request has been processed.
    * @param ctx the environment to interact with the framework and master
    * @param snapshot the SnapshotDescriptor of the snapshot to list
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index 36e086a..09ad3f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -26,9 +26,6 @@ import java.io.InputStream;
 import java.io.FileNotFoundException;
 import java.util.List;
 
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.CanUnbuffer;
@@ -40,6 +37,10 @@ import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
@@ -297,6 +298,7 @@ public class FileLink {
      * @throws IOException on unexpected error, or file not found.
      */
     private FSDataInputStream tryOpen() throws IOException {
+      IOException exception = null;
       for (Path path: fileLink.getLocations()) {
         if (path.equals(currentPath)) continue;
         try {
@@ -312,14 +314,11 @@ public class FileLink {
           }
           currentPath = path;
           return(in);
-        } catch (FileNotFoundException e) {
-          // Try another file location
-        } catch (RemoteException re) {
-          IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-          if (!(ioe instanceof FileNotFoundException)) throw re;
+        } catch (FileNotFoundException | AccessControlException | RemoteException e) {
+          exception = FileLink.handleAccessLocationException(fileLink, e, exception);
         }
       }
-      throw new FileNotFoundException("Unable to open link: " + fileLink);
+      throw exception;
     }
 
     @Override
@@ -405,14 +404,47 @@ public class FileLink {
    * @throws IOException on unexpected error.
    */
   public FileStatus getFileStatus(FileSystem fs) throws IOException {
+    IOException exception = null;
     for (int i = 0; i < locations.length; ++i) {
       try {
         return fs.getFileStatus(locations[i]);
-      } catch (FileNotFoundException e) {
-        // Try another file location
+      } catch (FileNotFoundException | AccessControlException e) {
+        exception = handleAccessLocationException(this, e, exception);
       }
     }
-    throw new FileNotFoundException("Unable to open link: " + this);
+    throw exception;
+  }
+
+  /**
+   * Handle exceptions which are threw when access locations of file link
+   * @param fileLink the file link
+   * @param newException the exception caught by access the current location
+   * @param previousException the previous exception caught by access the other locations
+   * @return return AccessControlException if access one of the locations caught, otherwise return
+   *         FileNotFoundException. The AccessControlException is threw if user scan snapshot
+   *         feature is enabled, see
+   *         {@link org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController}.
+   * @throws IOException if the exception is neither AccessControlException nor
+   *           FileNotFoundException
+   */
+  private static IOException handleAccessLocationException(FileLink fileLink,
+      IOException newException, IOException previousException) throws IOException {
+    if (newException instanceof RemoteException) {
+      newException = ((RemoteException) newException)
+          .unwrapRemoteException(FileNotFoundException.class, AccessControlException.class);
+    }
+    if (newException instanceof FileNotFoundException) {
+      // Try another file location
+      if (previousException == null) {
+        previousException = new FileNotFoundException("Unable to open link: " + fileLink);
+      }
+    } else if (newException instanceof AccessControlException) {
+      // Try another file location
+      previousException = newException;
+    } else {
+      throw newException;
+    }
+    return previousException;
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 5ed56ea..13c6764 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -1065,6 +1065,16 @@ public class MasterCoprocessorHost
     });
   }
 
+  public void postCompletedSnapshotAction(SnapshotDescription snapshot,
+      TableDescriptor hTableDescriptor) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postCompletedSnapshotAction(this, snapshot, hTableDescriptor);
+      }
+    });
+  }
+
   public void preListSnapshot(final SnapshotDescription snapshot) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
       @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index 0d65264..a5f091b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -59,7 +59,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 
 /**
@@ -228,6 +228,10 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
       status.markComplete(msg);
       LOG.info(msg);
       metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost()
+            .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), this.htd);
+      }
     } catch (Exception e) { // FindBugs: REC_CATCH_EXCEPTION
       status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +
           snapshotTable + " because " + e.getMessage());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
index bcf070a..6d37fe5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
@@ -263,6 +263,7 @@ public final class PermissionStorage {
   static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
       throws IOException{
     Delete d = new Delete(tableName.getName());
+    d.addFamily(ACL_LIST_FAMILY);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing permissions of removed table "+ tableName);
@@ -280,7 +281,7 @@ public final class PermissionStorage {
   static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
       throws IOException{
     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
-
+    d.addFamily(ACL_LIST_FAMILY);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing permissions of removed namespace "+ namespace);
     }
@@ -839,17 +840,21 @@ public final class PermissionStorage {
   }
 
   public static boolean isGlobalEntry(byte[] entryName) {
-    return entryName != null && TableName.valueOf(entryName).equals(ACL_TABLE_NAME);
+    return Bytes.equals(entryName, ACL_GLOBAL_NAME);
   }
 
   public static boolean isNamespaceEntry(String entryName) {
-    return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
+    return isNamespaceEntry(Bytes.toBytes(entryName));
   }
 
   public static boolean isNamespaceEntry(byte[] entryName) {
     return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
   }
 
+  public static boolean isTableEntry(byte[] entryName) {
+    return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
+  }
+
   public static String toNamespaceEntry(String namespace) {
     return NAMESPACE_PREFIX + namespace;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
new file mode 100644
index 0000000..c964b67
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SnapshotDescription;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Set HDFS ACLs to hFiles to make HBase granted users have permission to scan snapshot
+ * <p>
+ * To use this feature, please mask sure HDFS config:
+ * <ul>
+ * <li>dfs.permissions.enabled = true</li>
+ * <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The implementation of this feature is as followings:
+ * <ul>
+ * <li>For common directories such as 'data' and 'archive', set other permission to '--x' to make
+ * everyone have the permission to access the directory.</li>
+ * <li>For namespace or table directories such as 'data/ns/table', 'archive/ns/table' and
+ * '.hbase-snapshot/snapshotName', set user 'r-x' access acl and 'r-x' default acl when following
+ * operations happen:
+ * <ul>
+ * <li>grant user with global, namespace or table permission;</li>
+ * <li>revoke user from global, namespace or table;</li>
+ * <li>snapshot table;</li>
+ * <li>truncate table;</li>
+ * </ul>
+ * </li>
+ * <li>Note: Because snapshots are at table level, so this feature just considers users with global,
+ * namespace or table permissions, ignores users with table CF or cell permissions.</li>
+ * </ul>
+ * </p>
+ */
+@CoreCoprocessor
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);
+
+  private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
+  private PathHelper pathHelper = null;
+  private FileSystem fs = null;
+  private volatile boolean initialized = false;
+  /** Provider for mapping principal names to Users */
+  private UserProvider userProvider;
+
+  @Override
+  public Optional<MasterObserver> getMasterObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
+      throws IOException {
+    if (c.getEnvironment().getConfiguration()
+        .getBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, false)) {
+      MasterCoprocessorEnvironment mEnv = c.getEnvironment();
+      if (!(mEnv instanceof HasMasterServices)) {
+        throw new IOException("Does not implement HMasterServices");
+      }
+      MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
+      hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
+          masterServices.getConnection());
+      pathHelper = hdfsAclHelper.getPathHelper();
+      fs = pathHelper.getFileSystem();
+      hdfsAclHelper.setCommonDirectoryPermission();
+      initialized = true;
+      userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
+    } else {
+      LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
+          + "because the config " + SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE
+          + " is false.");
+    }
+  }
+
+  @Override
+  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
+    if (checkInitialized()) {
+      try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
+        if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
+          // Check if hbase acl table has 'm' CF, if not, add 'm' CF
+          TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
+          boolean containHdfsAclFamily =
+              Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(family -> Bytes
+                  .equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
+          if (!containHdfsAclFamily) {
+            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
+                .setColumnFamily(ColumnFamilyDescriptorBuilder
+                    .newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
+            admin.modifyTable(builder.build());
+          }
+        } else {
+          throw new TableNotFoundException("Table " + PermissionStorage.ACL_TABLE_NAME
+              + " is not created yet. Please check if " + getClass().getName()
+              + " is configured after " + AccessController.class.getName());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
+    if (checkInitialized()) {
+      hdfsAclHelper.close();
+    }
+  }
+
+  @Override
+  public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
+      TableDescriptor desc, RegionInfo[] regions) throws IOException {
+    if (!desc.getTableName().isSystemTable() && checkInitialized()) {
+      TableName tableName = desc.getTableName();
+      List<Path> paths = hdfsAclHelper.getTableRootPaths(tableName, false);
+      for (Path path : paths) {
+        if (!fs.exists(path)) {
+          fs.mkdirs(path);
+        }
+      }
+      // Add table owner HDFS acls
+      String owner =
+          desc.getOwnerString() == null ? getActiveUser(c).getShortName() : desc.getOwnerString();
+      hdfsAclHelper.addTableAcl(tableName, owner);
+      try (Table aclTable =
+          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, owner, tableName);
+      }
+    }
+  }
+
+  @Override
+  public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
+      NamespaceDescriptor ns) throws IOException {
+    if (checkInitialized()) {
+      List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
+      for (Path path : paths) {
+        if (!fs.exists(path)) {
+          fs.mkdirs(path);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
+      SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
+    if (!tableDescriptor.getTableName().isSystemTable() && checkInitialized()) {
+      hdfsAclHelper.snapshotAcl(snapshot);
+    }
+  }
+
+  @Override
+  public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
+      TableName tableName) throws IOException {
+    if (!tableName.isSystemTable() && checkInitialized()) {
+      hdfsAclHelper.resetTableAcl(tableName);
+    }
+  }
+
+  @Override
+  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      TableName tableName) throws IOException {
+    if (!tableName.isSystemTable() && checkInitialized()) {
+      /*
+       * remove table user access HDFS acl from namespace directory if the user has no permissions
+       * of global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission,
+       * when delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
+       * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
+       * remove Bob access acl.
+       */
+      Set<String> removeUsers = new HashSet<>();
+      try (Table aclTable =
+          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        List<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
+        SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
+        byte[] namespace = tableName.getNamespace();
+        for (String user : users) {
+          List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
+          boolean remove = true;
+          for (byte[] entry : userEntries) {
+            if (PermissionStorage.isGlobalEntry(entry)) {
+              remove = false;
+              break;
+            } else if (PermissionStorage.isNamespaceEntry(entry)
+                && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), namespace)) {
+              remove = false;
+              break;
+            } else if (Bytes.equals(TableName.valueOf(entry).getNamespace(), namespace)) {
+              remove = false;
+              break;
+            }
+          }
+          if (remove) {
+            removeUsers.add(user);
+          }
+        }
+      }
+      if (removeUsers.size() > 0) {
+        hdfsAclHelper.removeNamespaceAcl(tableName, removeUsers);
+      }
+    }
+  }
+
+  @Override
+  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String namespace) throws IOException {
+    if (checkInitialized()) {
+      try (Table aclTable =
+          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, namespace);
+      }
+      /**
+       * Delete namespace tmp directory because it's created by this coprocessor when namespace is
+       * created to make namespace default acl can be inherited by tables. The namespace data
+       * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
+       * deleted by HFileCleaner.
+       */
+      Path tmpNsDir = pathHelper.getTmpNsDir(namespace);
+      if (fs.exists(tmpNsDir)) {
+        if (fs.listStatus(tmpNsDir).length == 0) {
+          fs.delete(tmpNsDir, false);
+        } else {
+          LOG.error("The tmp directory {} of namespace {} is not empty after delete namespace",
+            tmpNsDir, namespace);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
+      UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
+    if (!checkInitialized()) {
+      return;
+    }
+    try (Table aclTable =
+        c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
+      Configuration conf = c.getEnvironment().getConfiguration();
+      String userName = userPermission.getUser();
+      switch (userPermission.getAccessScope()) {
+        case GLOBAL:
+          UserPermission perm = getUserGlobalPermission(conf, userName);
+          if (perm != null && containReadPermission(perm)) {
+            if (!isHdfsAclSet(aclTable, userName)) {
+              Pair<Set<String>, Set<TableName>> namespaceAndTable =
+                      SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
+              Set<String> skipNamespaces = namespaceAndTable.getFirst();
+              Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
+                      .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
+                      .collect(Collectors.toSet());
+              hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
+              SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
+            }
+          } else {
+            // The merged user permission doesn't contain READ, so remove user global HDFS acls if
+            // it's set
+            removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
+          }
+          break;
+        case NAMESPACE:
+          String namespace = ((NamespacePermission) userPermission.getPermission()).getNamespace();
+          UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
+          if (nsPerm != null && containReadPermission(nsPerm)) {
+            if (!isHdfsAclSet(aclTable, userName, namespace)) {
+              Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
+                      .getUserNamespaceAndTable(aclTable, userName).getSecond();
+              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
+            }
+            SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
+          } else {
+            // The merged user permission doesn't contain READ, so remove user namespace HDFS acls
+            // if it's set
+            removeUserNamespaceHdfsAcl(aclTable, userName, namespace, userPermission);
+          }
+          break;
+        case TABLE:
+          TableName tableName = ((TablePermission) userPermission.getPermission()).getTableName();
+          UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
+          if (tPerm != null) {
+            TablePermission tablePermission = (TablePermission) tPerm.getPermission();
+            if (tablePermission.hasFamily() || tablePermission.hasQualifier()) {
+              break;
+            }
+          }
+          if (tPerm != null && containReadPermission(tPerm)) {
+            if (!isHdfsAclSet(aclTable, userName, tableName)) {
+              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
+            }
+            SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
+          } else {
+            // The merged user permission doesn't contain READ, so remove user table HDFS acls if
+            // it's set
+            removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
+          }
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Illegal user permission scope " + userPermission.getAccessScope());
+      }
+    }
+  }
+
+  @Override
+  public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
+      UserPermission userPermission) throws IOException {
+    if (checkInitialized()) {
+      try (Table aclTable =
+          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        String userName = userPermission.getUser();
+        Configuration conf = c.getEnvironment().getConfiguration();
+        switch (userPermission.getAccessScope()) {
+          case GLOBAL:
+            UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
+            if (userGlobalPerm == null || !containReadPermission(userGlobalPerm)) {
+              removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
+            }
+            break;
+          case NAMESPACE:
+            NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
+            UserPermission userNsPerm =
+                getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
+            if (userNsPerm == null || !containReadPermission(userNsPerm)) {
+              removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
+            }
+            break;
+          case TABLE:
+            TablePermission tPerm = (TablePermission) userPermission.getPermission();
+            UserPermission userTablePerm =
+                getUserTablePermission(conf, userName, tPerm.getTableName());
+            if (userTablePerm == null || !containReadPermission(userTablePerm)) {
+              removeUserTableHdfsAcl(aclTable, userName, tPerm.getTableName(), userPermission);
+            }
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Illegal user permission scope " + userPermission.getAccessScope());
+        }
+      }
+    }
+  }
+
+  private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
+      UserPermission userPermission) throws IOException {
+    if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
+      // remove user global acls but reserve ns and table acls
+      Pair<Set<String>, Set<TableName>> namespaceAndTable =
+          SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
+      Set<String> skipNamespaces = namespaceAndTable.getFirst();
+      Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
+          .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
+          .collect(Collectors.toSet());
+      hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
+      SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
+    }
+  }
+
+  private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, String namespace,
+      UserPermission userPermission) throws IOException {
+    // remove user ns acls but reserve table acls
+    if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace)) {
+      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
+        Set<TableName> skipTables =
+            SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName).getSecond();
+        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
+      }
+      SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
+    }
+  }
+
+  private void removeUserTableHdfsAcl(Table aclTable, String userName, TableName tableName,
+      UserPermission userPermission) throws IOException {
+    if (SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName)) {
+      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
+          && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
+            tableName.getNamespaceAsString())) {
+        // remove table acls
+        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
+      }
+      SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
+    }
+  }
+
+  private boolean containReadPermission(UserPermission userPermission) {
+    if (userPermission != null) {
+      return Arrays.stream(userPermission.getPermission().getActions())
+          .anyMatch(action -> action == Action.READ);
+    }
+    return false;
+  }
+
+  private UserPermission getUserGlobalPermission(Configuration conf, String userName)
+      throws IOException {
+    List<UserPermission> permissions = PermissionStorage.getUserPermissions(conf,
+      PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
+    if (permissions != null && permissions.size() > 0) {
+      return permissions.get(0);
+    }
+    return null;
+  }
+
+  private UserPermission getUserNamespacePermission(Configuration conf, String userName,
+      String namespace) throws IOException {
+    List<UserPermission> permissions =
+        PermissionStorage.getUserNamespacePermissions(conf, namespace, userName, true);
+    if (permissions != null && permissions.size() > 0) {
+      return permissions.get(0);
+    }
+    return null;
+  }
+
+  private UserPermission getUserTablePermission(Configuration conf, String userName,
+      TableName tableName) throws IOException {
+    List<UserPermission> permissions =
+        PermissionStorage.getUserTablePermissions(conf, tableName, null, null, userName, true);
+    if (permissions != null && permissions.size() > 0) {
+      return permissions.get(0);
+    }
+    return null;
+  }
+
+  private boolean isHdfsAclSet(Table aclTable, String userName) throws IOException {
+    return isHdfsAclSet(aclTable, userName, null, null);
+  }
+
+  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace)
+      throws IOException {
+    return isHdfsAclSet(aclTable, userName, namespace, null);
+  }
+
+  private boolean isHdfsAclSet(Table aclTable, String userName, TableName tableName)
+      throws IOException {
+    return isHdfsAclSet(aclTable, userName, null, tableName);
+  }
+
+  /**
+   * Check if user global/namespace/table HDFS acls is already set to hfile
+   */
+  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace,
+      TableName tableName) throws IOException {
+    boolean isSet = SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName);
+    if (namespace != null) {
+      isSet = isSet
+          || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace);
+    }
+    if (tableName != null) {
+      isSet = isSet
+          || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
+            tableName.getNamespaceAsString())
+          || SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName);
+    }
+    return isSet;
+  }
+
+  private boolean checkInitialized() {
+    if (initialized) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
+    // for non-rpc handling, fallback to system user
+    Optional<User> optionalUser = ctx.getCaller();
+    if (optionalUser.isPresent()) {
+      return optionalUser.get();
+    }
+    return userProvider.getCurrent();
+  }
+
+  static final class SnapshotScannerHDFSAclStorage {
+    /**
+     * Add a new CF in HBase acl table to record if the HBase read permission is synchronized to
+     * related hfiles. The record has two usages: 1. check if we need to remove HDFS acls for a
+     * grant without READ permission(eg: grant user table read permission and then grant user table
+     * write permission without merging the existing permissions, in this case, need to remove HDFS
+     * acls); 2. skip some HDFS acl sync because it may be already set(eg: grant user table read
+     * permission and then grant user ns read permission; grant user table read permission and then
+     * grant user table write permission with merging the existing permissions).
+     */
+    static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
+    // The value 'R' has no specific meaning, if cell value is not null, it means that the user HDFS
+    // acls is set to hfiles.
+    private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");
+
+    static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
+      addUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
+    }
+
+    static void addUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
+        throws IOException {
+      addUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
+    }
+
+    static void addUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
+        throws IOException {
+      addUserEntry(aclTable, user, tableName.getName());
+    }
+
+    private static void addUserEntry(Table t, String user, byte[] entry) throws IOException {
+      Put p = new Put(entry);
+      p.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(user), HDFS_ACL_VALUE);
+      t.put(p);
+    }
+
+    static void deleteUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
+      deleteUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
+    }
+
+    static void deleteUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
+        throws IOException {
+      deleteUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
+    }
+
+    static void deleteUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
+        throws IOException {
+      deleteUserEntry(aclTable, user, tableName.getName());
+    }
+
+    private static void deleteUserEntry(Table aclTable, String user, byte[] entry)
+        throws IOException {
+      Delete delete = new Delete(entry);
+      delete.addColumns(HDFS_ACL_FAMILY, Bytes.toBytes(user));
+      aclTable.delete(delete);
+    }
+
+    static void deleteNamespaceHdfsAcl(Table aclTable, String namespace) throws IOException {
+      deleteEntry(aclTable, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
+    }
+
+    static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws IOException {
+      deleteEntry(aclTable, tableName.getName());
+    }
+
+    private static void deleteEntry(Table aclTable, byte[] entry) throws IOException {
+      Delete delete = new Delete(entry);
+      delete.addFamily(HDFS_ACL_FAMILY);
+      aclTable.delete(delete);
+    }
+
+    static List<String> getTableUsers(Table aclTable, TableName tableName) throws IOException {
+      return getEntryUsers(aclTable, tableName.getName());
+    }
+
+    private static List<String> getEntryUsers(Table aclTable, byte[] entry) throws IOException {
+      List<String> users = new ArrayList<>();
+      Get get = new Get(entry);
+      get.addFamily(HDFS_ACL_FAMILY);
+      Result result = aclTable.get(get);
+      List<Cell> cells = result.listCells();
+      if (cells != null) {
+        for (Cell cell : cells) {
+          if (cell != null) {
+            users.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
+          }
+        }
+      }
+      return users;
+    }
+
+    static Pair<Set<String>, Set<TableName>> getUserNamespaceAndTable(Table aclTable,
+        String userName) throws IOException {
+      Set<String> namespaces = new HashSet<>();
+      Set<TableName> tables = new HashSet<>();
+      List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
+      for (byte[] entry : userEntries) {
+        if (PermissionStorage.isNamespaceEntry(entry)) {
+          namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
+        } else if (PermissionStorage.isTableEntry(entry)) {
+          tables.add(TableName.valueOf(entry));
+        }
+      }
+      return new Pair<>(namespaces, tables);
+    }
+
+    static List<byte[]> getUserEntries(Table aclTable, String userName) throws IOException {
+      Scan scan = new Scan();
+      scan.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
+      ResultScanner scanner = aclTable.getScanner(scan);
+      List<byte[]> entry = new ArrayList<>();
+      for (Result result : scanner) {
+        if (result != null && result.getRow() != null) {
+          entry.add(result.getRow());
+        }
+      }
+      return entry;
+    }
+
+    static boolean hasUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
+      return hasUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
+    }
+
+    static boolean hasUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
+        throws IOException {
+      return hasUserEntry(aclTable, user,
+        Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
+    }
+
+    static boolean hasUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
+        throws IOException {
+      return hasUserEntry(aclTable, user, tableName.getName());
+    }
+
+    private static boolean hasUserEntry(Table aclTable, String userName, byte[] entry)
+        throws IOException {
+      Get get = new Get(entry);
+      get.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
+      return aclTable.exists(get);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
new file mode 100644
index 0000000..0bd766b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.SnapshotDescription;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A helper to modify or remove HBase granted user default and access HDFS ACLs over hFiles.
+ */
+@InterfaceAudience.Private
+public class SnapshotScannerHDFSAclHelper implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class);
+
+  public static final String USER_SCAN_SNAPSHOT_ENABLE = "hbase.user.scan.snapshot.enable";
+  public static final String USER_SCAN_SNAPSHOT_THREAD_NUMBER =
+      "hbase.user.scan.snapshot.thread.number";
+  // The tmp directory to restore snapshot, it can not be a sub directory of HBase root dir
+  public static final String SNAPSHOT_RESTORE_TMP_DIR = "hbase.snapshot.restore.tmp.dir";
+  public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT =
+      "/hbase/.tmpdir-to-restore-snapshot";
+  // The default permission of the common directories if the feature is enabled.
+  public static final String COMMON_DIRECTORY_PERMISSION =
+      "hbase.user.scan.snapshot.common.directory.permission";
+  // The secure HBase permission is 700, 751 means all others have execute access and the mask is
+  // set to read-execute to make the extended access ACL entries can work. Be cautious to set
+  // this value.
+  public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751";
+  // The default permission of the snapshot restore directories if the feature is enabled.
+  public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION =
+      "hbase.user.scan.snapshot.restore.directory.permission";
+  // 753 means all others have write-execute access.
+  public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = "753";
+
+  private Admin admin;
+  private final Configuration conf;
+  private FileSystem fs;
+  private PathHelper pathHelper;
+  private ExecutorService pool;
+
+  public SnapshotScannerHDFSAclHelper(Configuration configuration, Connection connection)
+      throws IOException {
+    this.conf = configuration;
+    this.pathHelper = new PathHelper(conf);
+    this.fs = pathHelper.getFileSystem();
+    this.pool = Executors.newFixedThreadPool(conf.getInt(USER_SCAN_SNAPSHOT_THREAD_NUMBER, 10),
+      new ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build());
+    this.admin = connection.getAdmin();
+  }
+
+  @Override
+  public void close() {
+    if (pool != null) {
+      pool.shutdown();
+    }
+    try {
+      admin.close();
+    } catch (IOException e) {
+      LOG.error("Close admin error", e);
+    }
+  }
+
+  public void setCommonDirectoryPermission() throws IOException {
+    // Set public directory permission to 751 to make all users have access permission.
+    // And we also need the access permission of the parent of HBase root directory, but
+    // it's not set here, because the owner of HBase root directory may don't own permission
+    // to change it's parent permission to 751.
+    // The {root/.tmp} and {root/.tmp/data} directories are created to make global user HDFS
+    // ACLs can be inherited.
+    List<Path> paths = Lists.newArrayList(pathHelper.getRootDir(), pathHelper.getMobDir(),
+      pathHelper.getTmpDir(), pathHelper.getArchiveDir());
+    paths.addAll(getGlobalRootPaths());
+    for (Path path : paths) {
+      if (!fs.exists(path)) {
+        fs.mkdirs(path);
+      }
+      fs.setPermission(path, new FsPermission(
+          conf.get(COMMON_DIRECTORY_PERMISSION, COMMON_DIRECTORY_PERMISSION_DEFAULT)));
+    }
+    // create snapshot restore directory
+    Path restoreDir =
+        new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, SNAPSHOT_RESTORE_TMP_DIR_DEFAULT));
+    if (!fs.exists(restoreDir)) {
+      fs.mkdirs(restoreDir);
+      fs.setPermission(restoreDir, new FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
+        SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
+    }
+  }
+
+  /**
+   * Set acl when grant user permission
+   * @param userPermission the user and permission
+   * @param skipNamespaces the namespace set to skip set acl because already set
+   * @param skipTables the table set to skip set acl because already set
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean grantAcl(UserPermission userPermission, Set<String> skipNamespaces,
+      Set<TableName> skipTables) {
+    try {
+      long start = System.currentTimeMillis();
+      handleGrantOrRevokeAcl(userPermission, HDFSAclOperation.OperationType.MODIFY, skipNamespaces,
+        skipTables);
+      LOG.info("Set HDFS acl when grant {}, cost {} ms", userPermission,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when grant: {}", userPermission, e);
+      return false;
+    }
+  }
+
+  /**
+   * Remove acl when grant or revoke user permission
+   * @param userPermission the user and permission
+   * @param skipNamespaces the namespace set to skip remove acl
+   * @param skipTables the table set to skip remove acl
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean revokeAcl(UserPermission userPermission, Set<String> skipNamespaces,
+      Set<TableName> skipTables) {
+    try {
+      long start = System.currentTimeMillis();
+      handleGrantOrRevokeAcl(userPermission, HDFSAclOperation.OperationType.REMOVE, skipNamespaces,
+        skipTables);
+      LOG.info("Set HDFS acl when revoke {}, cost {} ms", userPermission,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when revoke: {}", userPermission, e);
+      return false;
+    }
+  }
+
+  /**
+   * Set acl when take a snapshot
+   * @param snapshot the snapshot desc
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean snapshotAcl(SnapshotDescription snapshot) {
+    try {
+      long start = System.currentTimeMillis();
+      TableName tableName = snapshot.getTableName();
+      // global user permission can be inherited from default acl automatically
+      Set<String> userSet = getUsersWithTableReadAction(tableName);
+      userSet.addAll(getUsersWithNamespaceReadAction(tableName.getNamespaceAsString()));
+      Path path = pathHelper.getSnapshotDir(snapshot.getName());
+      handleHDFSAcl(new HDFSAclOperation(fs, path, userSet, HDFSAclOperation.OperationType.MODIFY,
+          true, HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS)).get();
+      LOG.info("Set HDFS acl when snapshot {}, cost {} ms", snapshot.getName(),
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when snapshot {}", snapshot, e);
+      return false;
+    }
+  }
+
+  /**
+   * Reset acl when truncate table
+   * @param tableName the specific table
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean resetTableAcl(TableName tableName) {
+    try {
+      long start = System.currentTimeMillis();
+      // global and namespace user permission can be inherited from default acl automatically
+      setTableAcl(tableName, getUsersWithTableReadAction(tableName));
+      LOG.info("Set HDFS acl when truncate {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when truncate {}", tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Remove table access acl from namespace dir when delete table
+   * @param tableName the table
+   * @param removeUsers the users whose access acl will be removed
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean removeNamespaceAcl(TableName tableName, Set<String> removeUsers) {
+    try {
+      long start = System.currentTimeMillis();
+      List<AclEntry> aclEntries = removeUsers.stream()
+          .map(removeUser -> aclEntry(ACCESS, removeUser)).collect(Collectors.toList());
+      String namespace = tableName.getNamespaceAsString();
+      List<Path> nsPaths = Lists.newArrayList(pathHelper.getTmpNsDir(namespace),
+        pathHelper.getDataNsDir(namespace), pathHelper.getMobDataNsDir(namespace));
+      // If table has no snapshots, then remove archive ns HDFS acl, otherwise reserve the archive
+      // ns acl to make the snapshots can be scanned, in the second case, need to remove the archive
+      // ns acl when all snapshots of the deleted table are deleted (will do it in later work).
+      if (getTableSnapshotPaths(tableName).isEmpty()) {
+        nsPaths.add(pathHelper.getArchiveNsDir(namespace));
+      }
+      for (Path nsPath : nsPaths) {
+        fs.removeAclEntries(nsPath, aclEntries);
+      }
+      LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when delete table {}", tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Set table owner acl when create table
+   * @param tableName the table
+   * @param user the table owner
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean addTableAcl(TableName tableName, String user) {
+    try {
+      long start = System.currentTimeMillis();
+      setTableAcl(tableName, Sets.newHashSet(user));
+      LOG.info("Set HDFS acl when create table {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when create table {}", tableName, e);
+      return false;
+    }
+  }
+
+  private void handleGrantOrRevokeAcl(UserPermission userPermission,
+      HDFSAclOperation.OperationType operationType, Set<String> skipNamespaces,
+      Set<TableName> skipTables) throws ExecutionException, InterruptedException, IOException {
+    Set<String> users = Sets.newHashSet(userPermission.getUser());
+    switch (userPermission.getAccessScope()) {
+      case GLOBAL:
+        handleGlobalAcl(users, skipNamespaces, skipTables, operationType);
+        break;
+      case NAMESPACE:
+        NamespacePermission namespacePermission =
+            (NamespacePermission) userPermission.getPermission();
+        handleNamespaceAcl(Sets.newHashSet(namespacePermission.getNamespace()), users,
+          skipNamespaces, skipTables, operationType);
+        break;
+      case TABLE:
+        TablePermission tablePermission = (TablePermission) userPermission.getPermission();
+        handleNamespaceAccessAcl(tablePermission.getNamespace(), users, operationType);
+        handleTableAcl(Sets.newHashSet(tablePermission.getTableName()), users, skipNamespaces,
+          skipTables, operationType);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Illegal user permission scope " + userPermission.getAccessScope());
+    }
+  }
+
+  private void handleGlobalAcl(Set<String> users, Set<String> skipNamespaces,
+      Set<TableName> skipTables, HDFSAclOperation.OperationType operationType)
+      throws ExecutionException, InterruptedException, IOException {
+    // handle global root directories HDFS acls
+    List<HDFSAclOperation> hdfsAclOperations = getGlobalRootPaths().stream()
+        .map(path -> new HDFSAclOperation(fs, path, users, operationType, false,
+            HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS))
+        .collect(Collectors.toList());
+    handleHDFSAclParallel(hdfsAclOperations).get();
+    // handle namespace HDFS acls
+    handleNamespaceAcl(Sets.newHashSet(admin.listNamespaces()), users, skipNamespaces, skipTables,
+      operationType);
+  }
+
+  private void handleNamespaceAcl(Set<String> namespaces, Set<String> users,
+      Set<String> skipNamespaces, Set<TableName> skipTables,
+      HDFSAclOperation.OperationType operationType)
+      throws ExecutionException, InterruptedException, IOException {
+    namespaces.removeAll(skipNamespaces);
+    // handle namespace root directories HDFS acls
+    List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
+    Set<String> skipTableNamespaces =
+        skipTables.stream().map(TableName::getNamespaceAsString).collect(Collectors.toSet());
+    for (String ns : namespaces) {
+      /**
+       * When op is REMOVE, remove the DEFAULT namespace ACL while keep the ACCESS for skipTables,
+       * otherwise remove both the DEFAULT + ACCESS ACLs. When op is MODIFY, just operate the
+       * DEFAULT + ACCESS ACLs.
+       */
+      HDFSAclOperation.OperationType op = operationType;
+      HDFSAclOperation.AclType aclType = HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS;
+      if (operationType == HDFSAclOperation.OperationType.REMOVE
+          && skipTableNamespaces.contains(ns)) {
+        // remove namespace directories default HDFS acls for skip tables
+        op = HDFSAclOperation.OperationType.REMOVE;
+        aclType = HDFSAclOperation.AclType.DEFAULT;
+      }
+      for (Path path : getNamespaceRootPaths(ns)) {
+        hdfsAclOperations.add(new HDFSAclOperation(fs, path, users, op, false, aclType));
+      }
+    }
+    handleHDFSAclParallel(hdfsAclOperations).get();
+    // handle table directories HDFS acls
+    Set<TableName> tables = new HashSet<>();
+    for (String namespace : namespaces) {
+      tables.addAll(admin.listTableDescriptorsByNamespace(Bytes.toBytes(namespace)).stream()
+          .map(TableDescriptor::getTableName).collect(Collectors.toSet()));
+    }
+    handleTableAcl(tables, users, skipNamespaces, skipTables, operationType);
+  }
+
+  private void handleTableAcl(Set<TableName> tableNames, Set<String> users,
+      Set<String> skipNamespaces, Set<TableName> skipTables,
+      HDFSAclOperation.OperationType operationType)
+      throws ExecutionException, InterruptedException, IOException {
+    Set<TableName> filterTableNames = new HashSet<>();
+    for (TableName tableName : tableNames) {
+      if (!skipTables.contains(tableName)
+          && !skipNamespaces.contains(tableName.getNamespaceAsString())) {
+        filterTableNames.add(tableName);
+      }
+    }
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    // handle table HDFS acls
+    for (TableName tableName : filterTableNames) {
+      List<HDFSAclOperation> hdfsAclOperations = getTableRootPaths(tableName, true).stream()
+          .map(path -> new HDFSAclOperation(fs, path, users, operationType, true,
+              HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS))
+          .collect(Collectors.toList());
+      CompletableFuture<Void> future = handleHDFSAclSequential(hdfsAclOperations);
+      futures.add(future);
+    }
+    CompletableFuture<Void> future =
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+    future.get();
+  }
+
+  private void handleNamespaceAccessAcl(String namespace, Set<String> users,
+      HDFSAclOperation.OperationType operationType)
+      throws ExecutionException, InterruptedException {
+    // handle namespace access HDFS acls
+    List<HDFSAclOperation> hdfsAclOperations =
+        getNamespaceRootPaths(namespace).stream().map(path -> new HDFSAclOperation(fs, path, users,
+            operationType, false, HDFSAclOperation.AclType.ACCESS)).collect(Collectors.toList());
+    CompletableFuture<Void> future = handleHDFSAclParallel(hdfsAclOperations);
+    future.get();
+  }
+
+  private void setTableAcl(TableName tableName, Set<String> users)
+      throws ExecutionException, InterruptedException, IOException {
+    HDFSAclOperation.OperationType operationType = HDFSAclOperation.OperationType.MODIFY;
+    handleNamespaceAccessAcl(tableName.getNamespaceAsString(), users, operationType);
+    handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), new HashSet<>(0),
+      operationType);
+  }
+
+  /**
+   * return paths that user will global permission will visit
+   * @return the path list
+   */
+  private List<Path> getGlobalRootPaths() {
+    return Lists.newArrayList(pathHelper.getTmpDataDir(), pathHelper.getDataDir(),
+      pathHelper.getMobDataDir(), pathHelper.getArchiveDataDir(), pathHelper.getSnapshotRootDir());
+  }
+
+  /**
+   * return paths that user will namespace permission will visit
+   * @param namespace the namespace
+   * @return the path list
+   * @throws IOException if an error occurred
+   */
+  List<Path> getNamespaceRootPaths(String namespace) {
+    List<Path> paths =
+        Lists.newArrayList(pathHelper.getTmpNsDir(namespace), pathHelper.getDataNsDir(namespace),
+          pathHelper.getMobDataNsDir(namespace), pathHelper.getArchiveNsDir(namespace));
+    return paths;
+  }
+
+  /**
+   * return paths that user will table permission will visit
+   * @param tableName the table
+   * @param includeSnapshotPath true if return table snapshots paths, otherwise false
+   * @return the path list
+   * @throws IOException if an error occurred
+   */
+  List<Path> getTableRootPaths(TableName tableName, boolean includeSnapshotPath)
+      throws IOException {
+    List<Path> paths = Lists.newArrayList(pathHelper.getTmpTableDir(tableName),
+      pathHelper.getDataTableDir(tableName), pathHelper.getMobTableDir(tableName),
+      pathHelper.getArchiveTableDir(tableName));
+    if (includeSnapshotPath) {
+      paths.addAll(getTableSnapshotPaths(tableName));
+    }
+    return paths;
+  }
+
+  private List<Path> getTableSnapshotPaths(TableName tableName) throws IOException {
+    return admin.listSnapshots().stream()
+        .filter(snapDesc -> snapDesc.getTableName().equals(tableName))
+        .map(snapshotDescription -> pathHelper.getSnapshotDir(snapshotDescription.getName()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Return users with namespace read permission
+   * @param namespace the namespace
+   * @return users with namespace read permission
+   * @throws IOException if an error occurred
+   */
+  private Set<String> getUsersWithNamespaceReadAction(String namespace) throws IOException {
+    return PermissionStorage.getNamespacePermissions(conf, namespace).entries().stream()
+        .filter(entry -> entry.getValue().getPermission().implies(READ))
+        .map(entry -> entry.getKey()).collect(Collectors.toSet());
+  }
+
+  /**
+   * Return users with table read permission
+   * @param tableName the table
+   * @return users with table read permission
+   * @throws IOException if an error occurred
+   */
+  private Set<String> getUsersWithTableReadAction(TableName tableName) throws IOException {
+    return PermissionStorage.getTablePermissions(conf, tableName).entries().stream()
+        .filter(entry -> entry.getValue().getPermission().implies(READ))
+        .map(entry -> entry.getKey()).collect(Collectors.toSet());
+  }
+
+  PathHelper getPathHelper() {
+    return pathHelper;
+  }
+
+  private CompletableFuture<Void> handleHDFSAcl(HDFSAclOperation acl) {
+    return CompletableFuture.supplyAsync(() -> {
+      List<HDFSAclOperation> childAclOperations = new ArrayList<>();
+      try {
+        acl.handleAcl();
+        childAclOperations = acl.getChildAclOperations();
+      } catch (FileNotFoundException e) {
+        // Skip handle acl if file not found
+      } catch (IOException e) {
+        LOG.error("Set HDFS acl error for path {}", acl.path, e);
+      }
+      return childAclOperations;
+    }, pool).thenComposeAsync(this::handleHDFSAclParallel, pool);
+  }
+
+  private CompletableFuture<Void> handleHDFSAclSequential(List<HDFSAclOperation> operations) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        for (HDFSAclOperation hdfsAclOperation : operations) {
+          handleHDFSAcl(hdfsAclOperation).get();
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Set HDFS acl error", e);
+      }
+      return null;
+    }, pool);
+  }
+
+  private CompletableFuture<Void> handleHDFSAclParallel(List<HDFSAclOperation> operations) {
+    List<CompletableFuture<Void>> futures =
+        operations.stream().map(this::handleHDFSAcl).collect(Collectors.toList());
+    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+  }
+
+  private static AclEntry aclEntry(AclEntryScope scope, String name) {
+    return new AclEntry.Builder().setScope(scope)
+        .setType(AuthUtil.isGroupPrincipal(name) ? GROUP : USER).setName(name)
+        .setPermission(READ_EXECUTE).build();
+  }
+
+  /**
+   * Inner class used to describe modify or remove what type of acl entries(ACCESS, DEFAULT,
+   * ACCESS_AND_DEFAULT) for files or directories(and child files).
+   */
+  private static class HDFSAclOperation {
+    enum OperationType {
+      MODIFY, REMOVE
+    }
+
+    enum AclType {
+      ACCESS, DEFAULT, DEFAULT_ADN_ACCESS
+    }
+
+    private interface Operation {
+      void apply(FileSystem fs, Path path, List<AclEntry> aclList) throws IOException;
+    }
+
+    private FileSystem fs;
+    private Path path;
+    private Operation operation;
+    private boolean recursive;
+    private AclType aclType;
+    private List<AclEntry> defaultAndAccessAclEntries;
+    private List<AclEntry> accessAclEntries;
+    private List<AclEntry> defaultAclEntries;
+
+    HDFSAclOperation(FileSystem fs, Path path, Set<String> users, OperationType operationType,
+        boolean recursive, AclType aclType) {
+      this.fs = fs;
+      this.path = path;
+      this.defaultAndAccessAclEntries = getAclEntries(AclType.DEFAULT_ADN_ACCESS, users);
+      this.accessAclEntries = getAclEntries(AclType.ACCESS, users);
+      this.defaultAclEntries = getAclEntries(AclType.DEFAULT, users);
+      if (operationType == OperationType.MODIFY) {
+        operation = FileSystem::modifyAclEntries;
+      } else if (operationType == OperationType.REMOVE) {
+        operation = FileSystem::removeAclEntries;
+      } else {
+        throw new IllegalArgumentException("Illegal HDFS acl operation type: " + operationType);
+      }
+      this.recursive = recursive;
+      this.aclType = aclType;
+    }
+
+    HDFSAclOperation(Path path, HDFSAclOperation parent) {
+      this.fs = parent.fs;
+      this.path = path;
+      this.defaultAndAccessAclEntries = parent.defaultAndAccessAclEntries;
+      this.accessAclEntries = parent.accessAclEntries;
+      this.defaultAclEntries = parent.defaultAclEntries;
+      this.operation = parent.operation;
+      this.recursive = parent.recursive;
+      this.aclType = parent.aclType;
+    }
+
+    List<HDFSAclOperation> getChildAclOperations() throws IOException {
+      List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
+      if (recursive && fs.isDirectory(path)) {
+        FileStatus[] fileStatuses = fs.listStatus(path);
+        for (FileStatus fileStatus : fileStatuses) {
+          hdfsAclOperations.add(new HDFSAclOperation(fileStatus.getPath(), this));
+        }
+      }
+      return hdfsAclOperations;
+    }
+
+    void handleAcl() throws IOException {
+      if (fs.exists(path)) {
+        if (fs.isDirectory(path)) {
+          switch (aclType) {
+            case ACCESS:
+              operation.apply(fs, path, accessAclEntries);
+              break;
+            case DEFAULT:
+              operation.apply(fs, path, defaultAclEntries);
+              break;
+            case DEFAULT_ADN_ACCESS:
+              operation.apply(fs, path, defaultAndAccessAclEntries);
+              break;
+            default:
+              throw new IllegalArgumentException("Illegal HDFS acl type: " + aclType);
+          }
+        } else {
+          operation.apply(fs, path, accessAclEntries);
+        }
+      }
+    }
+
+    private List<AclEntry> getAclEntries(AclType aclType, Set<String> users) {
+      List<AclEntry> aclEntries = new ArrayList<>();
+      switch (aclType) {
+        case ACCESS:
+          for (String user : users) {
+            aclEntries.add(aclEntry(ACCESS, user));
+          }
+          break;
+        case DEFAULT:
+          for (String user : users) {
+            aclEntries.add(aclEntry(DEFAULT, user));
+          }
+          break;
+        case DEFAULT_ADN_ACCESS:
+          for (String user : users) {
+            aclEntries.add(aclEntry(ACCESS, user));
+            aclEntries.add(aclEntry(DEFAULT, user));
+          }
+          break;
+        default:
+          throw new IllegalArgumentException("Illegal HDFS acl type: " + aclType);
+      }
+      return aclEntries;
+    }
+  }
+
+  static final class PathHelper {
+    Configuration conf;
+    Path rootDir;
+    Path tmpDataDir;
+    Path dataDir;
+    Path mobDataDir;
+    Path archiveDataDir;
+    Path snapshotDir;
+
+    PathHelper(Configuration conf) {
+      this.conf = conf;
+      rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+      tmpDataDir = new Path(new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY),
+          HConstants.BASE_NAMESPACE_DIR);
+      dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR);
+      mobDataDir = new Path(MobUtils.getMobHome(rootDir), HConstants.BASE_NAMESPACE_DIR);
+      archiveDataDir = new Path(new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY),
+          HConstants.BASE_NAMESPACE_DIR);
+      snapshotDir = new Path(rootDir, HConstants.SNAPSHOT_DIR_NAME);
+    }
+
+    Path getRootDir() {
+      return rootDir;
+    }
+
+    Path getDataDir() {
+      return dataDir;
+    }
+
+    Path getMobDir() {
+      return mobDataDir.getParent();
+    }
+
+    Path getMobDataDir() {
+      return mobDataDir;
+    }
+
+    Path getTmpDir() {
+      return new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY);
+    }
+
+    Path getTmpDataDir() {
+      return tmpDataDir;
+    }
+
+    Path getArchiveDir() {
+      return new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY);
+    }
+
+    Path getArchiveDataDir() {
+      return archiveDataDir;
+    }
+
+    Path getDataNsDir(String namespace) {
+      return new Path(dataDir, namespace);
+    }
+
+    Path getMobDataNsDir(String namespace) {
+      return new Path(mobDataDir, namespace);
+    }
+
+    Path getDataTableDir(TableName tableName) {
+      return new Path(getDataNsDir(tableName.getNamespaceAsString()),
+          tableName.getQualifierAsString());
+    }
+
+    Path getMobTableDir(TableName tableName) {
+      return new Path(getMobDataNsDir(tableName.getNamespaceAsString()),
+          tableName.getQualifierAsString());
+    }
+
+    Path getArchiveNsDir(String namespace) {
+      return new Path(archiveDataDir, namespace);
+    }
+
+    Path getArchiveTableDir(TableName tableName) {
+      return new Path(getArchiveNsDir(tableName.getNamespaceAsString()),
+          tableName.getQualifierAsString());
+    }
+
+    Path getTmpNsDir(String namespace) {
+      return new Path(tmpDataDir, namespace);
+    }
+
+    Path getTmpTableDir(TableName tableName) {
+      return new Path(getTmpNsDir(tableName.getNamespaceAsString()),
+          tableName.getQualifierAsString());
+    }
+
+    Path getSnapshotRootDir() {
+      return snapshotDir;
+    }
+
+    Path getSnapshotDir(String snapshot) {
+      return new Path(snapshotDir, snapshot);
+    }
+
+    FileSystem getFileSystem() throws IOException {
+      return rootDir.getFileSystem(conf);
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
new file mode 100644
index 0000000..e35e3a1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
+import static org.apache.hadoop.hbase.security.access.Permission.Action.WRITE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ SecurityTests.class, LargeTests.class })
+public class TestSnapshotScannerHDFSAclController {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSnapshotScannerHDFSAclController.class);
+  @Rule
+  public TestName name = new TestName();
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSnapshotScannerHDFSAclController.class);
+
+  private static final String UN_GRANT_USER = "un_grant_user";
+  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf = TEST_UTIL.getConfiguration();
+  private static Admin admin = null;
+  private static FileSystem fs = null;
+  private static Path rootDir = null;
+  private static User unGrantUser = null;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // enable hdfs acl and set umask to 027
+    conf.setBoolean("dfs.namenode.acls.enabled", true);
+    conf.set("fs.permissions.umask-mode", "027");
+    // enable hbase hdfs acl feature
+    conf.setBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, true);
+    // enable secure
+    conf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    conf.set(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR,
+      SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
+    SecureTestUtil.enableSecurity(conf);
+    // add SnapshotScannerHDFSAclController coprocessor
+    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+      conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + ","
+          + SnapshotScannerHDFSAclController.class.getName());
+
+    TEST_UTIL.startMiniCluster();
+    admin = TEST_UTIL.getAdmin();
+    rootDir = TEST_UTIL.getDefaultRootDirPath();
+    fs = rootDir.getFileSystem(conf);
+    unGrantUser = User.createUserForTesting(conf, UN_GRANT_USER, new String[] {});
+
+    // set hbase directory permission
+    FsPermission commonDirectoryPermission =
+        new FsPermission(conf.get(SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION,
+          SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION_DEFAULT));
+    Path path = rootDir;
+    while (path != null) {
+      fs.setPermission(path, commonDirectoryPermission);
+      path = path.getParent();
+    }
+    // set restore directory permission
+    Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
+    if (!fs.exists(restoreDir)) {
+      fs.mkdirs(restoreDir);
+      fs.setPermission(restoreDir,
+        new FsPermission(
+            conf.get(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
+              SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
+    }
+    path = restoreDir.getParent();
+    while (path != null) {
+      fs.setPermission(path, commonDirectoryPermission);
+      path = path.getParent();
+    }
+
+    TEST_UTIL.waitTableAvailable(PermissionStorage.ACL_TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testGrantGlobal() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace1 = name.getMethodName();
+    String namespace2 = namespace1 + "2";
+    String namespace3 = namespace1 + "3";
+    TableName table1 = TableName.valueOf(namespace1, "t1");
+    TableName table12 = TableName.valueOf(namespace1, "t2");
+    TableName table21 = TableName.valueOf(namespace2, "t21");
+    TableName table3 = TableName.valueOf(namespace3, "t3");
+    TableName table31 = TableName.valueOf(namespace3, "t31");
+    String snapshot1 = namespace1 + "t1";
+    String snapshot12 = namespace1 + "t12";
+    String snapshot2 = namespace1 + "t2";
+    String snapshot21 = namespace2 + "t21";
+    String snapshot3 = namespace1 + "t3";
+    String snapshot31 = namespace1 + "t31";
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
+
+    // case 1: grant G(R) -> grant G(W) -> grant G(R)
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    admin.grant(
+      new UserPermission(grantUserName, Permission.newBuilder().withActions(WRITE).build()), true);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    admin.snapshot(snapshot12, table1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot12, 6);
+
+    // case 2: grant G(R),N(R) -> G(W)
+    admin.grant(new UserPermission(grantUserName,
+        Permission.newBuilder(namespace1).withActions(READ).build()),
+      false);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
+    // table in ns1
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table12);
+    admin.snapshot(snapshot2, table12);
+    // table in ns2
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table21);
+    admin.snapshot(snapshot21, table21);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot21, -1);
+
+    // case 3: grant G(R),T(R) -> G(W)
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
+    admin.snapshot(snapshot3, table3);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table3, READ);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table31);
+    admin.snapshot(snapshot31, table31);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot31, -1);
+  }
+
+  @Test
+  public void testGrantNamespace() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    TableName table2 = TableName.valueOf(namespace, "t2");
+    TableName table3 = TableName.valueOf(namespace, "t3");
+    String snapshot = namespace + "t1";
+    String snapshot2 = namespace + "t2";
+    String snapshot3 = namespace + "t3";
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
+
+    // case 1: grant N(R) -> grant N(W) -> grant N(R)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
+    admin.snapshot(snapshot3, table3);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, unGrantUser, snapshot, -1);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+
+    // case 2: grant T(R) -> N(W)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
+    admin.snapshot(snapshot2, table2);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, WRITE);
+
+    // case 3: grant G(R) -> N(W)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
+  }
+
+  @Test
+  public void testGrantTable() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    TableName table2 = TableName.valueOf(namespace, "t2");
+    String snapshot = namespace + "t1";
+    String snapshot2 = namespace + "t1-2";
+    String snapshot3 = namespace + "t2";
+
+    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
+      TestHDFSAclHelper.put(t);
+      admin.snapshot(snapshot, table);
+      // table owner can scan table snapshot
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL,
+        User.createUserForTesting(conf, "owner", new String[] {}), snapshot, 6);
+      // case 1: grant table family(R)
+      SecureTestUtil.grantOnTable(TEST_UTIL, grantUserName, table, TestHDFSAclHelper.COLUMN1, null,
+        READ);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+      // case 2: grant T(R)
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+      TestHDFSAclHelper.put2(t);
+      admin.snapshot(snapshot2, table);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
+    }
+    // create t2 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
+    admin.snapshot(snapshot3, table2);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+
+    // case 3: grant T(R) -> grant T(W) with merging existing permissions
+    TEST_UTIL.getAdmin().grant(
+      new UserPermission(grantUserName, Permission.newBuilder(table).withActions(WRITE).build()),
+      true);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+
+    // case 4: grant T(R) -> grant T(W) without merging existing permissions
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, WRITE);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+  }
+
+  @Test
+  public void testRevokeGlobal() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    TableName table2 = TableName.valueOf(namespace, "t2");
+    TableName table3 = TableName.valueOf(namespace, "t3");
+    String snapshot1 = namespace + "t1";
+    String snapshot2 = namespace + "t2";
+    String snapshot3 = namespace + "t3";
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
+    // case 1: grant G(R) -> revoke G(R)
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+
+    // case 2: grant G(R), grant N(R), grant T(R) -> revoke G(R)
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
+    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
+    admin.snapshot(snapshot2, table2);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
+    SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+
+    // case 3: grant G(R), grant T(R) -> revoke G(R)
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
+    admin.snapshot(snapshot3, table3);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+  }
+
+  @Test
+  public void testRevokeNamespace() throws Exception {
+    String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    TableName table2 = TableName.valueOf(namespace, "t2");
+    TableName table3 = TableName.valueOf(namespace, "t3");
+    TableName table4 = TableName.valueOf(namespace, "t4");
+    String snapshot1 = namespace + "t1";
+    String snapshot2 = namespace + "t2";
+    String snapshot3 = namespace + "t3";
+    String snapshot4 = namespace + "t4";
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
+
+    // case 1: grant N(R) -> revoke N(R)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(namespace).build()));
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
+    admin.snapshot(snapshot3, table3);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+
+    // case 2: grant N(R), grant G(R) -> revoke N(R)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(namespace).build()));
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table4);
+    admin.snapshot(snapshot4, table4);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot4, 6);
+    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+
+    // case 3: grant N(R), grant T(R) -> revoke N(R)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
+    SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.createTable(TEST_UTIL, table2);
+    admin.snapshot(snapshot2, table2);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+  }
+
+  @Test
+  public void testRevokeTable() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
+
+    // case 1: grant T(R) -> revoke table family
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+    SecureTestUtil.revokeFromTable(TEST_UTIL, grantUserName, table, TestHDFSAclHelper.COLUMN1, null,
+      READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+
+    // case 2: grant T(R) -> revoke T(R)
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+    admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+
+    // case 3: grant T(R), grant N(R) -> revoke T(R)
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
+
+    // case 4: grant T(R), grant G(R) -> revoke T(R)
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+  }
+
+  @Test
+  public void testTruncateTable() throws Exception {
+    String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String grantUserName2 = grantUserName + "2";
+    User grantUser2 = User.createUserForTesting(conf, grantUserName2, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName tableName = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    String snapshot2 = namespace + "t1-2";
+    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName)) {
+      TestHDFSAclHelper.put(t);
+      // snapshot
+      admin.snapshot(snapshot, tableName);
+      // grant user2 namespace permission
+      SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName2, namespace, READ);
+      // grant user table permission
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, tableName, READ);
+      // truncate table
+      admin.disableTable(tableName);
+      admin.truncateTable(tableName, true);
+      TestHDFSAclHelper.put2(t);
+      // snapshot
+      admin.snapshot(snapshot2, tableName);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 9);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 9);
+    }
+  }
+
+  @Test
+  public void testRestoreSnapshot() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    String snapshot2 = namespace + "t1-2";
+    String snapshot3 = namespace + "t1-3";
+
+    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
+      TestHDFSAclHelper.put(t);
+      // grant t1, snapshot
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+      admin.snapshot(snapshot, table);
+      // delete
+      admin.disableTable(table);
+      admin.deleteTable(table);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+
+      // restore snapshot and restore acl
+      admin.restoreSnapshot(snapshot, true, true);
+      TestHDFSAclHelper.put2(t);
+      // snapshot
+      admin.snapshot(snapshot2, table);
+      // delete
+      admin.disableTable(table);
+      admin.deleteTable(table);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
+
+      // restore snapshot and skip restore acl
+      admin.restoreSnapshot(snapshot);
+      admin.snapshot(snapshot3, table);
+
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+    }
+  }
+
+  @Test
+  public void testDeleteTable() throws Exception {
+    String namespace = name.getMethodName();
+    String grantUserName1 = namespace + "1";
+    String grantUserName2 = namespace + "2";
+    String grantUserName3 = namespace + "3";
+    User grantUser1 = User.createUserForTesting(conf, grantUserName1, new String[] {});
+    User grantUser2 = User.createUserForTesting(conf, grantUserName2, new String[] {});
+    User grantUser3 = User.createUserForTesting(conf, grantUserName3, new String[] {});
+
+    TableName tableName1 = TableName.valueOf(namespace, "t1");
+    TableName tableName2 = TableName.valueOf(namespace, "t2");
+    String snapshot1 = namespace + "t1";
+    String snapshot2 = namespace + "t2";
+    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName1);
+        Table t2 = TestHDFSAclHelper.createTable(TEST_UTIL, tableName2)) {
+      TestHDFSAclHelper.put(t);
+      TestHDFSAclHelper.put(t2);
+      // snapshot
+      admin.snapshot(snapshot1, tableName1);
+      admin.snapshot(snapshot2, tableName2);
+      // grant user table permission
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName1, tableName1, READ);
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName2, tableName2, READ);
+      SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName3, namespace, READ);
+      // delete table
+      admin.disableTable(tableName1);
+      admin.deleteTable(tableName1);
+      // grantUser2 and grantUser3 should have data/ns acl
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser1, snapshot1, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser3, snapshot2, 6);
+    }
+  }
+
+  @Test
+  public void testDeleteNamespace() throws Exception {
+    String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName tableName = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName)) {
+      TestHDFSAclHelper.put(t);
+      // snapshot
+      admin.snapshot(snapshot, tableName);
+      // grant user2 namespace permission
+      SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+      // truncate table
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+      // snapshot
+      admin.deleteNamespace(namespace);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    }
+  }
+
+  @Test
+  public void testGrantMobTable() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+
+    try (Table t = TestHDFSAclHelper.createMobTable(TEST_UTIL, table)) {
+      TestHDFSAclHelper.put(t);
+      admin.snapshot(snapshot, table);
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    }
+  }
+}
+
+final class TestHDFSAclHelper {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestHDFSAclHelper.class);
+
+  private TestHDFSAclHelper() {
+  }
+
+  static void grantOnTable(HBaseTestingUtility util, String user, TableName tableName,
+      Permission.Action... actions) throws Exception {
+    SecureTestUtil.grantOnTable(util, user, tableName, null, null, actions);
+  }
+
+  private static void createNamespace(HBaseTestingUtility util, String namespace)
+      throws IOException {
+    if (!Arrays.stream(util.getAdmin().listNamespaceDescriptors())
+        .anyMatch(ns -> ns.getName().equals(namespace))) {
+      NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
+      util.getAdmin().createNamespace(namespaceDescriptor);
+    }
+  }
+
+  static Table createTable(HBaseTestingUtility util, TableName tableName) throws IOException {
+    createNamespace(util, tableName.getNamespaceAsString());
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN1).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).build())
+        .setOwner(User.createUserForTesting(util.getConfiguration(), "owner", new String[] {}))
+        .build();
+    byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
+    return util.createTable(td, splits);
+  }
+
+  static Table createMobTable(HBaseTestingUtility util, TableName tableName) throws IOException {
+    createNamespace(util, tableName.getNamespaceAsString());
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN1).setMobEnabled(true)
+            .setMobThreshold(0).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).setMobEnabled(true)
+            .setMobThreshold(0).build())
+        .setOwner(User.createUserForTesting(util.getConfiguration(), "owner", new String[] {}))
+        .build();
+    byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
+    return util.createTable(td, splits);
+  }
+
+  static void createTableAndPut(HBaseTestingUtility util, TableName tableNam) throws IOException {
+    try (Table t = createTable(util, tableNam)) {
+      put(t);
+    }
+  }
+
+  static final byte[] COLUMN1 = Bytes.toBytes("A");
+  static final byte[] COLUMN2 = Bytes.toBytes("B");
+
+  static void put(Table hTable) throws IOException {
+    List<Put> puts = new ArrayList<>();
+    for (int i = 0; i < 6; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(COLUMN1, null, Bytes.toBytes(i));
+      put.addColumn(COLUMN2, null, Bytes.toBytes(i + 1));
+      puts.add(put);
+    }
+    hTable.put(puts);
+  }
+
+  static void put2(Table hTable) throws IOException {
+    List<Put> puts = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      if (i == 5) {
+        continue;
+      }
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(COLUMN1, null, Bytes.toBytes(i + 2));
+      put.addColumn(COLUMN2, null, Bytes.toBytes(i + 3));
+      puts.add(put);
+    }
+    hTable.put(puts);
+  }
+
+  /**
+   * Check if user is able to read expected rows from the specific snapshot
+   * @param user the specific user
+   * @param snapshot the snapshot to be scanned
+   * @param expectedRowCount expected row count read from snapshot, -1 if expects
+   *          AccessControlException
+   * @throws IOException user scan snapshot error
+   * @throws InterruptedException user scan snapshot error
+   */
+  static void canUserScanSnapshot(HBaseTestingUtility util, User user, String snapshot,
+      int expectedRowCount) throws IOException, InterruptedException {
+    PrivilegedExceptionAction<Void> action =
+        getScanSnapshotAction(util.getConfiguration(), snapshot, expectedRowCount);
+    user.runAs(action);
+  }
+
+  private static PrivilegedExceptionAction<Void> getScanSnapshotAction(Configuration conf,
+      String snapshotName, long expectedRowCount) {
+    PrivilegedExceptionAction<Void> action = () -> {
+      try {
+        Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
+        Scan scan = new Scan();
+        TableSnapshotScanner scanner =
+            new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+        int rowCount = 0;
+        while (true) {
+          Result result = scanner.next();
+          if (result == null) {
+            break;
+          }
+          rowCount++;
+        }
+        scanner.close();
+        assertEquals(expectedRowCount, rowCount);
+      } catch (Exception e) {
+        LOG.debug("Scan snapshot error, snapshot {}", snapshotName, e);
+        assertEquals(expectedRowCount, -1);
+      }
+      return null;
+    };
+    return action;
+  }
+}
\ No newline at end of file