You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by me...@apache.org on 2019/08/08 08:01:09 UTC

[hbase] branch master updated: HBASE-22776 Rename config names in user scan snapshot feature (#440)

This is an automated email from the ASF dual-hosted git repository.

meiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e5dc6d  HBASE-22776 Rename config names in user scan snapshot feature (#440)
0e5dc6d is described below

commit 0e5dc6d7cee92524bf648b6f49d1565e098e5bc4
Author: meiyi <my...@gmail.com>
AuthorDate: Thu Aug 8 16:01:02 2019 +0800

    HBASE-22776 Rename config names in user scan snapshot feature (#440)
---
 .../hbase/master/snapshot/SnapshotManager.java     |   6 +
 .../access/SnapshotScannerHDFSAclCleaner.java      |  10 +-
 .../access/SnapshotScannerHDFSAclController.java   |  55 +-
 .../access/SnapshotScannerHDFSAclHelper.java       |  77 ++-
 .../TestSnapshotScannerHDFSAclController.java      | 698 ++++++++++++++-------
 5 files changed, 570 insertions(+), 276 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index 94d7785..b3340fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
+import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
@@ -1123,6 +1125,10 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
       // Inject snapshot cleaners, if snapshot.enable is true
       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
       hfileCleaners.add(HFileLinkCleaner.class.getName());
+      // If sync acl to HDFS feature is enabled, then inject the cleaner
+      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
+        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
+      }
 
       // Set cleaners conf
       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclCleaner.java
index 86c663d..6bf4c1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclCleaner.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -59,7 +58,7 @@ public class SnapshotScannerHDFSAclCleaner extends BaseHFileCleanerDelegate {
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
-    userScanSnapshotEnabled = isUserScanSnapshotEnabled(conf);
+    userScanSnapshotEnabled = SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf);
   }
 
   @Override
@@ -82,13 +81,6 @@ public class SnapshotScannerHDFSAclCleaner extends BaseHFileCleanerDelegate {
     return true;
   }
 
-  private boolean isUserScanSnapshotEnabled(Configuration conf) {
-    String masterCoprocessors = conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
-    return conf.getBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, false)
-        && masterCoprocessors.contains(SnapshotScannerHDFSAclController.class.getName())
-        && masterCoprocessors.contains(AccessController.class.getName());
-  }
-
   private boolean isEmptyArchiveDirDeletable(Path dir) {
     try {
       if (isArchiveDataDir(dir)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
index f6d5b76..82e3430 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
@@ -119,7 +119,7 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
   public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
       throws IOException {
     if (c.getEnvironment().getConfiguration()
-        .getBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, false)) {
+        .getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)) {
       MasterCoprocessorEnvironment mEnv = c.getEnvironment();
       if (!(mEnv instanceof HasMasterServices)) {
         throw new IOException("Does not implement HMasterServices");
@@ -133,7 +133,7 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
       userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
     } else {
       LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
-          + "because the config " + SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE
+          + "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
           + " is false.");
     }
   }
@@ -213,7 +213,9 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
   public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
       TableName tableName) throws IOException {
     if (needHandleTableHdfsAcl(tableName, "truncateTable " + tableName)) {
-      // Since the table directories is recreated, so add HDFS acls again
+      // 1. create tmp table directories
+      hdfsAclHelper.createTableDirectories(tableName);
+      // 2. Since the table directories is recreated, so add HDFS acls again
       Set<String> users = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
       hdfsAclHelper.addTableAcl(tableName, users, "truncate");
     }
@@ -233,9 +235,11 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
       try (Table aclTable =
           ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
         Set<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
-        // 1. Delete table owner permission is synced to HDFS in acl table
+        // 1. Remove table archive directory default ACLs
+        hdfsAclHelper.removeTableDefaultAcl(tableName, users);
+        // 2. Delete table owner permission is synced to HDFS in acl table
         SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
-        // 2. Remove namespace access acls
+        // 3. Remove namespace access acls
         Set<String> removeUsers = filterUsersToRemoveNsAccessAcl(aclTable, tableName, users);
         if (removeUsers.size() > 0) {
           hdfsAclHelper.removeNamespaceAccessAcl(tableName, removeUsers, "delete");
@@ -251,7 +255,7 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
     try (Table aclTable =
         ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
       if (needHandleTableHdfsAcl(currentDescriptor, "modifyTable " + tableName)
-          && !hdfsAclHelper.isTableUserScanSnapshotEnabled(oldDescriptor)) {
+          && !hdfsAclHelper.isAclSyncToHdfsEnabled(oldDescriptor)) {
         // 1. Create table directories used for acl inherited
         hdfsAclHelper.createTableDirectories(tableName);
         // 2. Add table users HDFS acls
@@ -264,7 +268,7 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
         SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
           tableUsers, tableName);
       } else if (needHandleTableHdfsAcl(oldDescriptor, "modifyTable " + tableName)
-          && !hdfsAclHelper.isTableUserScanSnapshotEnabled(currentDescriptor)) {
+          && !hdfsAclHelper.isAclSyncToHdfsEnabled(currentDescriptor)) {
         // 1. Remove empty table directories
         List<Path> tableRootPaths = hdfsAclHelper.getTableRootPaths(tableName, false);
         for (Path path : tableRootPaths) {
@@ -290,17 +294,24 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String namespace) throws IOException {
     if (checkInitialized("deleteNamespace " + namespace)) {
-      // 1. Record namespace user acl is not synced to HDFS
-      SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
-        namespace);
-      // 2. Delete tmp namespace directory
-      /**
-       * Delete namespace tmp directory because it's created by this coprocessor when namespace is
-       * created to make namespace default acl can be inherited by tables. The namespace data
-       * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
-       * deleted by HFileCleaner.
-       */
-      hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
+      try (Table aclTable =
+          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        // 1. Delete namespace archive dir default ACLs
+        Set<String> users = SnapshotScannerHDFSAclStorage.getEntryUsers(aclTable,
+          PermissionStorage.toNamespaceEntry(Bytes.toBytes(namespace)));
+        hdfsAclHelper.removeNamespaceDefaultAcl(namespace, users);
+        // 2. Record namespace user acl is not synced to HDFS
+        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
+          namespace);
+        // 3. Delete tmp namespace directory
+        /**
+         * Delete namespace tmp directory because it's created by this coprocessor when namespace is
+         * created to make namespace default acl can be inherited by tables. The namespace data
+         * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
+         * deleted by HFileCleaner.
+         */
+        hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
+      }
     }
   }
 
@@ -364,7 +375,9 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
             UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
             if (tPerm != null && hdfsAclHelper.containReadAction(tPerm)) {
               if (!isHdfsAclSet(aclTable, userName, tableName)) {
-                // 1. Add HDFS acl
+                // 1. create table dirs
+                hdfsAclHelper.createTableDirectories(tableName);
+                // 2. Add HDFS acl
                 hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
               }
               // 2. Record table acl is synced to HDFS
@@ -547,13 +560,13 @@ public class SnapshotScannerHDFSAclController implements MasterCoprocessor, Mast
 
   private boolean needHandleTableHdfsAcl(TableName tableName, String operation) throws IOException {
     return !tableName.isSystemTable() && checkInitialized(operation) && hdfsAclHelper
-        .isTableUserScanSnapshotEnabled(masterServices.getTableDescriptors().get(tableName));
+        .isAclSyncToHdfsEnabled(masterServices.getTableDescriptors().get(tableName));
   }
 
   private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, String operation) {
     TableName tableName = tableDescriptor.getTableName();
     return !tableName.isSystemTable() && checkInitialized(operation)
-        && hdfsAclHelper.isTableUserScanSnapshotEnabled(tableDescriptor);
+        && hdfsAclHelper.isAclSyncToHdfsEnabled(tableDescriptor);
   }
 
   private User getActiveUser(ObserverContext<?> ctx) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
index 60d9155..6cf1916 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -71,23 +73,23 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
 public class SnapshotScannerHDFSAclHelper implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class);
 
-  public static final String USER_SCAN_SNAPSHOT_ENABLE = "hbase.user.scan.snapshot.enable";
-  public static final String USER_SCAN_SNAPSHOT_THREAD_NUMBER =
-      "hbase.user.scan.snapshot.thread.number";
+  public static final String ACL_SYNC_TO_HDFS_ENABLE = "hbase.acl.sync.to.hdfs.enable";
+  public static final String ACL_SYNC_TO_HDFS_THREAD_NUMBER =
+      "hbase.acl.sync.to.hdfs.thread.number";
   // The tmp directory to restore snapshot, it can not be a sub directory of HBase root dir
   public static final String SNAPSHOT_RESTORE_TMP_DIR = "hbase.snapshot.restore.tmp.dir";
   public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT =
       "/hbase/.tmpdir-to-restore-snapshot";
   // The default permission of the common directories if the feature is enabled.
   public static final String COMMON_DIRECTORY_PERMISSION =
-      "hbase.user.scan.snapshot.common.directory.permission";
+      "hbase.acl.sync.to.hdfs.common.directory.permission";
   // The secure HBase permission is 700, 751 means all others have execute access and the mask is
   // set to read-execute to make the extended access ACL entries can work. Be cautious to set
   // this value.
   public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751";
   // The default permission of the snapshot restore directories if the feature is enabled.
   public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION =
-      "hbase.user.scan.snapshot.restore.directory.permission";
+      "hbase.acl.sync.to.hdfs.restore.directory.permission";
   // 753 means all others have write-execute access.
   public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = "753";
 
@@ -102,7 +104,7 @@ public class SnapshotScannerHDFSAclHelper implements Closeable {
     this.conf = configuration;
     this.pathHelper = new PathHelper(conf);
     this.fs = pathHelper.getFileSystem();
-    this.pool = Executors.newFixedThreadPool(conf.getInt(USER_SCAN_SNAPSHOT_THREAD_NUMBER, 10),
+    this.pool = Executors.newFixedThreadPool(conf.getInt(ACL_SYNC_TO_HDFS_THREAD_NUMBER, 10),
       new ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build());
     this.admin = connection.getAdmin();
   }
@@ -231,6 +233,50 @@ public class SnapshotScannerHDFSAclHelper implements Closeable {
   }
 
   /**
+   * Remove default acl from namespace archive dir when delete namespace
+   * @param namespace the namespace
+   * @param removeUsers the users whose default acl will be removed
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean removeNamespaceDefaultAcl(String namespace, Set<String> removeUsers) {
+    try {
+      long start = System.currentTimeMillis();
+      Path archiveNsDir = pathHelper.getArchiveNsDir(namespace);
+      HDFSAclOperation operation = new HDFSAclOperation(fs, archiveNsDir, removeUsers,
+          HDFSAclOperation.OperationType.REMOVE, false, HDFSAclOperation.AclType.DEFAULT);
+      operation.handleAcl();
+      LOG.info("Remove HDFS acl when delete namespace {}, cost {} ms", namespace,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Remove HDFS acl error when delete namespace {}", namespace, e);
+      return false;
+    }
+  }
+
+  /**
+   * Remove default acl from table archive dir when delete table
+   * @param tableName the table name
+   * @param removeUsers the users whose default acl will be removed
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean removeTableDefaultAcl(TableName tableName, Set<String> removeUsers) {
+    try {
+      long start = System.currentTimeMillis();
+      Path archiveTableDir = pathHelper.getArchiveTableDir(tableName);
+      HDFSAclOperation operation = new HDFSAclOperation(fs, archiveTableDir, removeUsers,
+          HDFSAclOperation.OperationType.REMOVE, false, HDFSAclOperation.AclType.DEFAULT);
+      operation.handleAcl();
+      LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Remove HDFS acl error when delete table {}", tableName, e);
+      return false;
+    }
+  }
+
+  /**
    * Add table user acls
    * @param tableName the table
    * @param users the table users with READ permission
@@ -349,7 +395,7 @@ public class SnapshotScannerHDFSAclHelper implements Closeable {
     Set<TableName> tables = new HashSet<>();
     for (String namespace : namespaces) {
       tables.addAll(admin.listTableDescriptorsByNamespace(Bytes.toBytes(namespace)).stream()
-          .filter(this::isTableUserScanSnapshotEnabled).map(TableDescriptor::getTableName)
+          .filter(this::isAclSyncToHdfsEnabled).map(TableDescriptor::getTableName)
           .collect(Collectors.toSet()));
     }
     handleTableAcl(tables, users, skipNamespaces, skipTables, operationType);
@@ -403,7 +449,7 @@ public class SnapshotScannerHDFSAclHelper implements Closeable {
    * return paths that user will global permission will visit
    * @return the path list
    */
-  private List<Path> getGlobalRootPaths() {
+  List<Path> getGlobalRootPaths() {
     return Lists.newArrayList(pathHelper.getTmpDataDir(), pathHelper.getDataDir(),
       pathHelper.getMobDataDir(), pathHelper.getArchiveDataDir(), pathHelper.getSnapshotRootDir());
   }
@@ -511,9 +557,20 @@ public class SnapshotScannerHDFSAclHelper implements Closeable {
     return !tablePermission.hasFamily() && !tablePermission.hasQualifier();
   }
 
-  boolean isTableUserScanSnapshotEnabled(TableDescriptor tableDescriptor) {
+  public static boolean isAclSyncToHdfsEnabled(Configuration conf) {
+    String[] masterCoprocessors = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
+    Set<String> masterCoprocessorSet = new HashSet<>();
+    if (masterCoprocessors != null) {
+      Collections.addAll(masterCoprocessorSet, masterCoprocessors);
+    }
+    return conf.getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)
+        && masterCoprocessorSet.contains(SnapshotScannerHDFSAclController.class.getName())
+        && masterCoprocessorSet.contains(AccessController.class.getName());
+  }
+
+  boolean isAclSyncToHdfsEnabled(TableDescriptor tableDescriptor) {
     return tableDescriptor == null ? false
-        : Boolean.valueOf(tableDescriptor.getValue(USER_SCAN_SNAPSHOT_ENABLE));
+        : Boolean.valueOf(tableDescriptor.getValue(ACL_SYNC_TO_HDFS_ENABLE));
   }
 
   PathHelper getPathHelper() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
index 69a4834..321b1d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.security.access;
 
 import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
 import static org.apache.hadoop.hbase.security.access.Permission.Action.WRITE;
+import static org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController.SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl;
+import static org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController.SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl;
+import static org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController.SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -57,7 +60,6 @@ import org.apache.hadoop.hbase.testclassification.SecurityTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -85,6 +87,7 @@ public class TestSnapshotScannerHDFSAclController {
   private static Path rootDir = null;
   private static User unGrantUser = null;
   private static SnapshotScannerHDFSAclHelper helper;
+  private static Table aclTable;
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
@@ -92,7 +95,7 @@ public class TestSnapshotScannerHDFSAclController {
     conf.setBoolean("dfs.namenode.acls.enabled", true);
     conf.set("fs.permissions.umask-mode", "027");
     // enable hbase hdfs acl feature
-    conf.setBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, true);
+    conf.setBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, true);
     // enable secure
     conf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
     conf.set(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR,
@@ -102,9 +105,6 @@ public class TestSnapshotScannerHDFSAclController {
     conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
       conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + ","
           + SnapshotScannerHDFSAclController.class.getName());
-    // set hfile cleaner plugin
-    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
-      SnapshotScannerHDFSAclCleaner.class.getName());
 
     TEST_UTIL.startMiniCluster();
     admin = TEST_UTIL.getAdmin();
@@ -140,6 +140,7 @@ public class TestSnapshotScannerHDFSAclController {
     SnapshotScannerHDFSAclController coprocessor = TEST_UTIL.getHBaseCluster().getMaster()
         .getMasterCoprocessorHost().findCoprocessor(SnapshotScannerHDFSAclController.class);
     TEST_UTIL.waitFor(1200000, () -> coprocessor.checkInitialized("check initialized"));
+    aclTable = admin.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME);
   }
 
   @AfterClass
@@ -148,112 +149,185 @@ public class TestSnapshotScannerHDFSAclController {
   }
 
   @Test
-  public void testGrantGlobal() throws Exception {
+  public void testGrantGlobal1() throws Exception {
     final String grantUserName = name.getMethodName();
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot1 = namespace + "s1";
+    String snapshot2 = namespace + "s2";
 
-    String namespace1 = name.getMethodName();
-    String namespace2 = namespace1 + "2";
-    String namespace3 = namespace1 + "3";
-    TableName table1 = TableName.valueOf(namespace1, "t1");
-    TableName table12 = TableName.valueOf(namespace1, "t2");
-    TableName table21 = TableName.valueOf(namespace2, "t21");
-    TableName table3 = TableName.valueOf(namespace3, "t3");
-    TableName table31 = TableName.valueOf(namespace3, "t31");
-    String snapshot1 = namespace1 + "t1";
-    String snapshot12 = namespace1 + "t12";
-    String snapshot2 = namespace1 + "t2";
-    String snapshot21 = namespace2 + "t21";
-    String snapshot3 = namespace1 + "t3";
-    String snapshot31 = namespace1 + "t31";
-
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
-    admin.snapshot(snapshot1, table1);
-
-    // case 1: grant G(R) -> grant G(W) -> grant G(R)
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot1, table);
+    // grant G(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    assertTrue(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    // grant G(W) with merging existing permissions
     admin.grant(
       new UserPermission(grantUserName, Permission.newBuilder().withActions(WRITE).build()), true);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    assertTrue(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    // grant G(W) without merging
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+    assertFalse(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    // grant G(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
-    admin.snapshot(snapshot12, table1);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot12, 6);
+    // take a snapshot and ACLs are inherited automatically
+    admin.snapshot(snapshot2, table);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
+    assertTrue(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+  }
+
+  @Test
+  public void testGrantGlobal2() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace1 = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace1, "t1");
+    String namespace2 = namespace1 + "2";
+    TableName table2 = TableName.valueOf(namespace2, "t2");
+    String snapshot1 = namespace1 + "s1";
+    String snapshot2 = namespace2 + "s2";
 
-    // case 2: grant G(R),N(R) -> G(W)
+    // grant G(R), grant namespace1(R)
+    SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    // create table in namespace1 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
     admin.grant(new UserPermission(grantUserName,
         Permission.newBuilder(namespace1).withActions(READ).build()),
       false);
+    // grant G(W)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
-    // table in ns1
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table12);
-    admin.snapshot(snapshot2, table12);
-    // table in ns2
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table21);
-    admin.snapshot(snapshot21, table21);
+    // create table in namespace2 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
+    admin.snapshot(snapshot2, table2);
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot21, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    assertFalse(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace1));
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace2));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, false, false);
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace1), grantUserName, true, true);
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace2), grantUserName, false, false);
+  }
 
-    // case 3: grant G(R),T(R) -> G(W)
+  @Test
+  public void testGrantGlobal3() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    TableName table2 = TableName.valueOf(namespace, "t2");
+    String snapshot1 = namespace + "s1";
+    String snapshot2 = namespace + "s2";
+    // grant G(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
-    admin.snapshot(snapshot3, table3);
-    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table3, READ);
+    // grant table1(R)
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
+    // grant G(W)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table31);
-    admin.snapshot(snapshot31, table31);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot31, -1);
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
+    admin.snapshot(snapshot2, table2);
+    // check scan snapshot
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    assertFalse(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table2));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, false, false);
+    checkUserAclEntry(helper.getTableRootPaths(table2, false), grantUserName, false, false);
+    checkUserAclEntry(helper.getTableRootPaths(table1, false), grantUserName, true, true);
   }
 
   @Test
-  public void testGrantNamespace() throws Exception {
+  public void testGrantNamespace1() throws Exception {
     final String grantUserName = name.getMethodName();
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
-
     String namespace = name.getMethodName();
-    TableName table = TableName.valueOf(namespace, "t1");
+    TableName table1 = TableName.valueOf(namespace, "t1");
     TableName table2 = TableName.valueOf(namespace, "t2");
-    TableName table3 = TableName.valueOf(namespace, "t3");
-    String snapshot = namespace + "t1";
-    String snapshot2 = namespace + "t2";
-    String snapshot3 = namespace + "t3";
-
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
-    admin.snapshot(snapshot, table);
+    String snapshot1 = namespace + "s1";
+    String snapshot2 = namespace + "s2";
 
-    // case 1: grant N(R) -> grant N(W) -> grant N(R)
+    // create table1 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
+    // grant N(R)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
-    admin.snapshot(snapshot3, table3);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, unGrantUser, snapshot, -1);
+    // create table2 and snapshot, ACLs can be inherited automatically
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
+    admin.snapshot(snapshot2, table2);
+    // check scan snapshot
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, unGrantUser, snapshot1, -1);
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, true);
+    // grant N(W)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
-    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, false, false);
+  }
+
+  @Test
+  public void testGrantNamespace2() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    String snapshot1 = namespace + "s1";
+
+    // create table1 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
 
-    // case 2: grant T(R) -> N(W)
+    // grant N(R)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
-    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+    // grant table1(R)
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
+    // grant N(W)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
-    admin.snapshot(snapshot2, table2);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
-    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, WRITE);
+    // check scan snapshot
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, false);
+    assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+    checkUserAclEntry(helper.getTableRootPaths(table1, false), grantUserName, true, true);
+  }
+
+  @Test
+  public void testGrantNamespace3() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
 
-    // case 3: grant G(R) -> N(W)
+    // create table1 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
+    // grant namespace(R)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    // grant global(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    // grant namespace(W)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, true);
+    assertTrue(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, true, true);
   }
 
   @Test
@@ -262,171 +336,244 @@ public class TestSnapshotScannerHDFSAclController {
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
 
     String namespace = name.getMethodName();
-    TableName table = TableName.valueOf(namespace, "t1");
-    TableName table2 = TableName.valueOf(namespace, "t2");
-    String snapshot = namespace + "t1";
-    String snapshot2 = namespace + "t1-2";
-    String snapshot3 = namespace + "t2";
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    String snapshot1 = namespace + "s1";
+    String snapshot2 = namespace + "s2";
 
-    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
+    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table1)) {
       TestHDFSAclHelper.put(t);
-      admin.snapshot(snapshot, table);
+      admin.snapshot(snapshot1, table1);
       // table owner can scan table snapshot
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL,
-        User.createUserForTesting(conf, "owner", new String[] {}), snapshot, 6);
-      // case 1: grant table family(R)
-      SecureTestUtil.grantOnTable(TEST_UTIL, grantUserName, table, TestHDFSAclHelper.COLUMN1, null,
+        User.createUserForTesting(conf, "owner", new String[] {}), snapshot1, 6);
+      // grant table1 family(R)
+      SecureTestUtil.grantOnTable(TEST_UTIL, grantUserName, table1, TestHDFSAclHelper.COLUMN1, null,
         READ);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
-      // case 2: grant T(R)
-      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+
+      // grant table1(R)
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
       TestHDFSAclHelper.put2(t);
-      admin.snapshot(snapshot2, table);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      admin.snapshot(snapshot2, table1);
+      // check scan snapshot
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
+      assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+      checkUserAclEntry(helper.getTableRootPaths(table1, false), grantUserName, true, true);
     }
-    // create t2 and snapshot
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
-    admin.snapshot(snapshot3, table2);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
 
-    // case 3: grant T(R) -> grant T(W) with merging existing permissions
-    TEST_UTIL.getAdmin().grant(
-      new UserPermission(grantUserName, Permission.newBuilder(table).withActions(WRITE).build()),
+    // grant table1(W) with merging existing permissions
+    admin.grant(
+      new UserPermission(grantUserName, Permission.newBuilder(table1).withActions(WRITE).build()),
       true);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+    checkUserAclEntry(helper.getTableRootPaths(table1, false), grantUserName, true, true);
 
-    // case 4: grant T(R) -> grant T(W) without merging existing permissions
-    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, WRITE);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    // grant table1(W) without merging existing permissions
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, WRITE);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+    checkUserAclEntry(helper.getTableRootPaths(table1, false), grantUserName, false, false);
   }
 
   @Test
-  public void testRevokeGlobal() throws Exception {
+  public void testGrantMobTable() throws Exception {
     final String grantUserName = name.getMethodName();
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "s1";
+
+    try (Table t = TestHDFSAclHelper.createMobTable(TEST_UTIL, table)) {
+      TestHDFSAclHelper.put(t);
+      admin.snapshot(snapshot, table);
+      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+      checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, true, true);
+    }
+  }
 
+  @Test
+  public void testRevokeGlobal1() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
     String namespace = name.getMethodName();
     TableName table1 = TableName.valueOf(namespace, "t1");
-    TableName table2 = TableName.valueOf(namespace, "t2");
-    TableName table3 = TableName.valueOf(namespace, "t3");
     String snapshot1 = namespace + "t1";
-    String snapshot2 = namespace + "t2";
-    String snapshot3 = namespace + "t3";
 
     TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
     admin.snapshot(snapshot1, table1);
-    // case 1: grant G(R) -> revoke G(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
     SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
+    assertFalse(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, false, false);
+  }
 
-    // case 2: grant G(R), grant N(R), grant T(R) -> revoke G(R)
+  @Test
+  public void testRevokeGlobal2() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    String snapshot1 = namespace + "s1";
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
+
+    // grant G(R), grant N(R), grant T(R) -> revoke G(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
     TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
     SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
-    admin.snapshot(snapshot2, table2);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
-    SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    assertFalse(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, false, false);
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, true);
+  }
+
+  @Test
+  public void testRevokeGlobal3() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+
+    String namespace = name.getMethodName();
+    TableName table1 = TableName.valueOf(namespace, "t1");
+    String snapshot1 = namespace + "t1";
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
+    admin.snapshot(snapshot1, table1);
 
-    // case 3: grant G(R), grant T(R) -> revoke G(R)
+    // grant G(R), grant T(R) -> revoke G(R)
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
     SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
-    admin.snapshot(snapshot3, table3);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+    assertFalse(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, false, false);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, false);
+    assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table1));
+    checkUserAclEntry(helper.getTableRootPaths(table1, false), grantUserName, true, true);
   }
 
   @Test
-  public void testRevokeNamespace() throws Exception {
+  public void testRevokeNamespace1() throws Exception {
     String grantUserName = name.getMethodName();
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
-
     String namespace = name.getMethodName();
     TableName table1 = TableName.valueOf(namespace, "t1");
-    TableName table2 = TableName.valueOf(namespace, "t2");
-    TableName table3 = TableName.valueOf(namespace, "t3");
-    TableName table4 = TableName.valueOf(namespace, "t4");
-    String snapshot1 = namespace + "t1";
-    String snapshot2 = namespace + "t2";
-    String snapshot3 = namespace + "t3";
-    String snapshot4 = namespace + "t4";
-
+    String snapshot1 = namespace + "s1";
     TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
     admin.snapshot(snapshot1, table1);
 
-    // case 1: grant N(R) -> revoke N(R)
+    // revoke N(R)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
     admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(namespace).build()));
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
-    admin.snapshot(snapshot3, table3);
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, false, false);
 
-    // case 2: grant N(R), grant G(R) -> revoke N(R)
+    // grant N(R), grant G(R) -> revoke N(R)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
     admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(namespace).build()));
-    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table4);
-    admin.snapshot(snapshot4, table4);
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot4, 6);
-    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, true);
+  }
 
-    // case 3: grant N(R), grant T(R) -> revoke N(R)
+  @Test
+  public void testRevokeNamespace2() throws Exception {
+    String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "s1";
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
+
+    // grant N(R), grant T(R) -> revoke N(R)
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
-    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
     SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
-    TestHDFSAclHelper.createTable(TEST_UTIL, table2);
-    admin.snapshot(snapshot2, table2);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
+    // check scan snapshot
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, false);
+    assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+    checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, true, true);
   }
 
   @Test
-  public void testRevokeTable() throws Exception {
+  public void testRevokeTable1() throws Exception {
     final String grantUserName = name.getMethodName();
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
-
     String namespace = name.getMethodName();
     TableName table = TableName.valueOf(namespace, "t1");
     String snapshot = namespace + "t1";
-
     TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
     admin.snapshot(snapshot, table);
 
-    // case 1: grant T(R) -> revoke table family
+    // grant T(R) -> revoke table family
     TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
     SecureTestUtil.revokeFromTable(TEST_UTIL, grantUserName, table, TestHDFSAclHelper.COLUMN1, null,
       READ);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
 
-    // case 2: grant T(R) -> revoke T(R)
+    // grant T(R) -> revoke T(R)
     TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
     admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+    checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, false, false);
+  }
+
+  @Test
+  public void testRevokeTable2() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
 
-    // case 3: grant T(R), grant N(R) -> revoke T(R)
+    // grant T(R), grant N(R) -> revoke T(R)
     TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
     SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
     admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+    checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, true, true);
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, true);
+  }
+
+  @Test
+  public void testRevokeTable3() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
 
-    // case 4: grant T(R), grant G(R) -> revoke T(R)
+    // grant T(R), grant G(R) -> revoke T(R)
     TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
     SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
     admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+    checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, true, true);
+    assertTrue(hasUserGlobalHdfsAcl(aclTable, grantUserName));
+    checkUserAclEntry(helper.getGlobalRootPaths(), grantUserName, true, true);
   }
 
   @Test
@@ -438,8 +585,8 @@ public class TestSnapshotScannerHDFSAclController {
 
     String namespace = name.getMethodName();
     TableName tableName = TableName.valueOf(namespace, "t1");
-    String snapshot = namespace + "t1";
-    String snapshot2 = namespace + "t1-2";
+    String snapshot = namespace + "s1";
+    String snapshot2 = namespace + "s2";
     try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName)) {
       TestHDFSAclHelper.put(t);
       // snapshot
@@ -454,10 +601,16 @@ public class TestSnapshotScannerHDFSAclController {
       TestHDFSAclHelper.put2(t);
       // snapshot
       admin.snapshot(snapshot2, tableName);
+      // check scan snapshot
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot, 6);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 9);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 9);
+      assertTrue(hasUserNamespaceHdfsAcl(aclTable, grantUserName2, namespace));
+      checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName2, true, true);
+      assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, tableName));
+      checkUserAclEntry(helper.getTableRootPaths(tableName, false), grantUserName, true, true);
+      checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName, true, false);
     }
   }
 
@@ -467,9 +620,9 @@ public class TestSnapshotScannerHDFSAclController {
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
     String namespace = name.getMethodName();
     TableName table = TableName.valueOf(namespace, "t1");
-    String snapshot = namespace + "t1";
-    String snapshot2 = namespace + "t1-2";
-    String snapshot3 = namespace + "t1-3";
+    String snapshot = namespace + "s1";
+    String snapshot2 = namespace + "s2";
+    String snapshot3 = namespace + "s3";
 
     try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
       TestHDFSAclHelper.put(t);
@@ -488,6 +641,8 @@ public class TestSnapshotScannerHDFSAclController {
       admin.snapshot(snapshot2, table);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
+      assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+      checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, true, true);
 
       // delete
       admin.disableTable(table);
@@ -499,6 +654,10 @@ public class TestSnapshotScannerHDFSAclController {
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
+      assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table));
+      checkUserAclEntry(helper.getPathHelper().getDataTableDir(table), grantUserName, false, false);
+      checkUserAclEntry(helper.getPathHelper().getArchiveTableDir(table), grantUserName, true,
+        false);
     }
   }
 
@@ -507,57 +666,62 @@ public class TestSnapshotScannerHDFSAclController {
     String namespace = name.getMethodName();
     String grantUserName1 = namespace + "1";
     String grantUserName2 = namespace + "2";
-    String grantUserName3 = namespace + "3";
     User grantUser1 = User.createUserForTesting(conf, grantUserName1, new String[] {});
     User grantUser2 = User.createUserForTesting(conf, grantUserName2, new String[] {});
-    User grantUser3 = User.createUserForTesting(conf, grantUserName3, new String[] {});
-
-    TableName tableName1 = TableName.valueOf(namespace, "t1");
-    TableName tableName2 = TableName.valueOf(namespace, "t2");
+    TableName table = TableName.valueOf(namespace, "t1");
     String snapshot1 = namespace + "t1";
-    String snapshot2 = namespace + "t2";
-    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName1);
-        Table t2 = TestHDFSAclHelper.createTable(TEST_UTIL, tableName2)) {
-      TestHDFSAclHelper.put(t);
-      TestHDFSAclHelper.put(t2);
-      // snapshot
-      admin.snapshot(snapshot1, tableName1);
-      admin.snapshot(snapshot2, tableName2);
-      // grant user table permission
-      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName1, tableName1, READ);
-      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName2, tableName2, READ);
-      SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName3, namespace, READ);
-      // delete table
-      admin.disableTable(tableName1);
-      admin.deleteTable(tableName1);
-      // grantUser2 and grantUser3 should have data/ns acl
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser1, snapshot1, -1);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 6);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser3, snapshot2, 6);
-    }
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    // snapshot
+    admin.snapshot(snapshot1, table);
+    // grant user table permission
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName1, table, READ);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName2, namespace, READ);
+    // delete table
+    admin.disableTable(table);
+    admin.deleteTable(table);
+    // grantUser2 and grantUser3 should have data/ns acl
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser1, snapshot1, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot1, 6);
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, grantUserName2, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), grantUserName2, true, true);
+    assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName1, table));
+    checkUserAclEntry(helper.getPathHelper().getDataTableDir(table), grantUserName1, false, false);
+    checkUserAclEntry(helper.getPathHelper().getMobTableDir(table), grantUserName1, false, false);
+    checkUserAclEntry(helper.getPathHelper().getArchiveTableDir(table), grantUserName1, true,
+      false);
+
+    // check tmp table directory does not exist
+    Path tmpTableDir = helper.getPathHelper().getTmpTableDir(table);
+    assertFalse(fs.exists(tmpTableDir));
   }
 
   @Test
   public void testDeleteNamespace() throws Exception {
     String grantUserName = name.getMethodName();
     User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
-
     String namespace = name.getMethodName();
-    TableName tableName = TableName.valueOf(namespace, "t1");
+    TableName table = TableName.valueOf(namespace, "t1");
     String snapshot = namespace + "t1";
-    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName)) {
-      TestHDFSAclHelper.put(t);
-      // snapshot
-      admin.snapshot(snapshot, tableName);
-      // grant user2 namespace permission
-      SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
-      // truncate table
-      admin.disableTable(tableName);
-      admin.deleteTable(tableName);
-      // snapshot
-      admin.deleteNamespace(namespace);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    }
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    // snapshot
+    admin.snapshot(snapshot, table);
+    // grant namespace permission
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    // delete table
+    admin.disableTable(table);
+    admin.deleteTable(table);
+    // delete namespace
+    admin.deleteNamespace(namespace);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+    assertFalse(hasUserNamespaceHdfsAcl(aclTable, grantUserName, namespace));
+    checkUserAclEntry(helper.getPathHelper().getArchiveNsDir(namespace), grantUserName, true,
+      false);
+
+    // check tmp namespace dir does not exist
+    assertFalse(fs.exists(helper.getPathHelper().getTmpNsDir(namespace)));
+    assertFalse(fs.exists(helper.getPathHelper().getDataNsDir(namespace)));
+    // assertFalse(fs.exists(helper.getPathHelper().getMobDataNsDir(namespace)));
   }
 
   @Test
@@ -578,6 +742,7 @@ public class TestSnapshotScannerHDFSAclController {
     cleaner.choreForTesting();
     Path archiveTableDir = HFileArchiveUtil.getTableArchivePath(rootDir, table);
     assertTrue(fs.exists(archiveTableDir));
+    checkUserAclEntry(helper.getTableRootPaths(table, false), grantUserName, true, true);
 
     // Check SnapshotScannerHDFSAclCleaner method
     assertTrue(SnapshotScannerHDFSAclCleaner.isArchiveTableDir(archiveTableDir));
@@ -589,29 +754,11 @@ public class TestSnapshotScannerHDFSAclController {
   }
 
   @Test
-  public void testGrantMobTable() throws Exception {
-    final String grantUserName = name.getMethodName();
-    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
-
+  public void testModifyTable1() throws Exception {
     String namespace = name.getMethodName();
     TableName table = TableName.valueOf(namespace, "t1");
     String snapshot = namespace + "t1";
 
-    try (Table t = TestHDFSAclHelper.createMobTable(TEST_UTIL, table)) {
-      TestHDFSAclHelper.put(t);
-      admin.snapshot(snapshot, table);
-      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-    }
-  }
-
-  @Test
-  public void testModifyTable() throws Exception {
-    String namespace = name.getMethodName();
-    TableName table = TableName.valueOf(namespace, "t1");
-    String snapshot = namespace + "t1";
-    TableName table2 = TableName.valueOf(namespace, "t2");
-
     String tableUserName = name.getMethodName();
     User tableUser = User.createUserForTesting(conf, tableUserName, new String[] {});
     String tableUserName2 = tableUserName + "2";
@@ -647,42 +794,121 @@ public class TestSnapshotScannerHDFSAclController {
 
     // enable user scan snapshot
     admin.modifyTable(TableDescriptorBuilder.newBuilder(td)
-        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, "true").build());
+        .setValue(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, "true").build());
+    // check scan snapshot
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser, snapshot, 6);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser2, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser3, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, nsUser, snapshot, 6);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser, snapshot, 6);
-    // disable user scan snapshot
+    // check acl table storage and ACLs in dirs
+    assertTrue(hasUserGlobalHdfsAcl(aclTable, globalUserName));
+    checkUserAclEntry(helper.getGlobalRootPaths(), globalUserName, true, true);
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, nsUserName, namespace));
+    checkUserAclEntry(helper.getNamespaceRootPaths(namespace), nsUserName, true, true);
+    assertTrue(hasUserTableHdfsAcl(aclTable, tableUserName, table));
+    checkUserAclEntry(helper.getTableRootPaths(table, false), tableUserName, true, true);
+    for (String user : new String[] { tableUserName2, tableUserName3 }) {
+      assertFalse(hasUserTableHdfsAcl(aclTable, user, table));
+      checkUserAclEntry(helper.getTableRootPaths(table, false), user, false, false);
+    }
+  }
+
+  @Test
+  public void testModifyTable2() throws Exception {
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    TableName table2 = TableName.valueOf(namespace, "t2");
+
+    String tableUserName = name.getMethodName();
+    User tableUser = User.createUserForTesting(conf, tableUserName, new String[] {});
+    String tableUserName2 = tableUserName + "2";
+    User tableUser2 = User.createUserForTesting(conf, tableUserName2, new String[] {});
+    String tableUserName3 = tableUserName + "3";
+    User tableUser3 = User.createUserForTesting(conf, tableUserName3, new String[] {});
+    String nsUserName = tableUserName + "-ns";
+    User nsUser = User.createUserForTesting(conf, nsUserName, new String[] {});
+    String globalUserName = tableUserName + "-global";
+    User globalUser = User.createUserForTesting(conf, globalUserName, new String[] {});
+    String globalUserName2 = tableUserName + "-global-2";
+    User globalUser2 = User.createUserForTesting(conf, globalUserName2, new String[] {});
+
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    SecureTestUtil.grantGlobal(TEST_UTIL, globalUserName, READ);
+    SecureTestUtil.grantGlobal(TEST_UTIL, globalUserName2, READ);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, nsUserName, namespace, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, tableUserName, table, READ);
+    SecureTestUtil.grantOnTable(TEST_UTIL, tableUserName2, table, TestHDFSAclHelper.COLUMN1, null,
+      READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, tableUserName3, table, WRITE);
+
     SecureTestUtil.grantOnNamespace(TEST_UTIL, tableUserName2, namespace, READ);
     TestHDFSAclHelper.createTable(TEST_UTIL, table2);
     TestHDFSAclHelper.grantOnTable(TEST_UTIL, tableUserName3, table2, READ);
-    admin.modifyTable(TableDescriptorBuilder.newBuilder(td)
-        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, "false").build());
+    // disable user scan snapshot
+    admin.modifyTable(TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
+        .setValue(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, "false").build());
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser2, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser3, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, nsUser, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser, snapshot, -1);
     TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser2, snapshot, -1);
-    List<Path> namespaceRootPaths = helper.getNamespaceRootPaths(namespace);
-    List<Path> tableRootPaths = helper.getTableRootPaths(table, false);
     // check access
-    for (Path path : tableRootPaths) {
-      checkUserAclEntry(path, tableUserName, false, false);
-      checkUserAclEntry(path, tableUserName2, false, false);
-      checkUserAclEntry(path, tableUserName3, false, false);
-      checkUserAclEntry(path, nsUserName, false, false);
-      checkUserAclEntry(path, globalUserName, false, false);
-      checkUserAclEntry(path, globalUserName2, false, false);
+    String[] users = new String[] { globalUserName, globalUserName2, nsUserName, tableUserName,
+      tableUserName2, tableUserName3 };
+    for (Path path : helper.getTableRootPaths(table, false)) {
+      for (String user : users) {
+        checkUserAclEntry(path, user, false, false);
+      }
     }
-    for (Path path : namespaceRootPaths) {
+    String[] nsUsers = new String[] { globalUserName, globalUserName2, nsUserName };
+    for (Path path : helper.getNamespaceRootPaths(namespace)) {
       checkUserAclEntry(path, tableUserName, false, false);
       checkUserAclEntry(path, tableUserName2, true, true);
       checkUserAclEntry(path, tableUserName3, true, false);
-      checkUserAclEntry(path, nsUserName, true, true);
-      checkUserAclEntry(path, globalUserName, true, true);
-      checkUserAclEntry(path, globalUserName2, true, true);
+      for (String user : nsUsers) {
+        checkUserAclEntry(path, user, true, true);
+      }
+    }
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, nsUserName, namespace));
+    assertTrue(hasUserNamespaceHdfsAcl(aclTable, tableUserName2, namespace));
+    assertFalse(hasUserTableHdfsAcl(aclTable, tableUserName, table));
+  }
+
+  @Test
+  public void testRestartMaster() throws Exception {
+    final String grantUserName = name.getMethodName();
+    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    admin.createNamespace(NamespaceDescriptor.create(namespace).build());
+
+    // grant N(R)
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
+    // restart cluster and tmp directory will not be deleted
+    TEST_UTIL.getMiniHBaseCluster().shutdown();
+    TEST_UTIL.restartHBaseCluster(1);
+    TEST_UTIL.waitUntilNoRegionsInTransition();
+
+    Path tmpNsDir = helper.getPathHelper().getTmpNsDir(namespace);
+    assertFalse(fs.exists(tmpNsDir));
+
+    // create table2 and snapshot
+    TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
+    admin = TEST_UTIL.getAdmin();
+    aclTable = TEST_UTIL.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME);
+    admin.snapshot(snapshot, table);
+    // TODO fix it in another patch
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
+  }
+
+  private void checkUserAclEntry(List<Path> paths, String user, boolean requireAccessAcl,
+      boolean requireDefaultAcl) throws Exception {
+    for (Path path : paths) {
+      checkUserAclEntry(path, user, requireAccessAcl, requireDefaultAcl);
     }
   }
 
@@ -703,8 +929,8 @@ public class TestSnapshotScannerHDFSAclController {
       }
     }
     String message = "require user: " + userName + ", path: " + path.toString() + " acl";
-    Assert.assertEquals(message, requireAccessAcl, accessAclEntry);
-    Assert.assertEquals(message, requireDefaultAcl, defaultAclEntry);
+    assertEquals(message, requireAccessAcl, accessAclEntry);
+    assertEquals(message, requireDefaultAcl, defaultAclEntry);
   }
 }
 
@@ -730,7 +956,7 @@ final class TestHDFSAclHelper {
   static Table createTable(HBaseTestingUtility util, TableName tableName) throws IOException {
     createNamespace(util, tableName.getNamespaceAsString());
     TableDescriptor td = getTableDescriptorBuilder(util, tableName)
-        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, "true").build();
+        .setValue(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, "true").build();
     byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
     return util.createTable(td, splits);
   }
@@ -743,7 +969,7 @@ final class TestHDFSAclHelper {
         .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).setMobEnabled(true)
             .setMobThreshold(0).build())
         .setOwner(User.createUserForTesting(util.getConfiguration(), "owner", new String[] {}))
-        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, "true").build();
+        .setValue(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, "true").build();
     byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
     return util.createTable(td, splits);
   }