You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/03/28 09:57:12 UTC

[hbase] 39/49: HBASE-21911 Move getUserPermissions from regionserver to master

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 81ee1dbaa1ad49ae7532bf07199ad6941f3581fd
Author: meiyi <my...@gamil.com>
AuthorDate: Mon Mar 25 10:35:27 2019 +0800

    HBASE-21911 Move getUserPermissions from regionserver to master
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../java/org/apache/hadoop/hbase/client/Admin.java |  11 ++
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |  10 ++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       |   7 ++
 .../hbase/client/ConnectionImplementation.java     |   8 ++
 .../org/apache/hadoop/hbase/client/HBaseAdmin.java |  20 +++
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  17 +++
 .../hbase/client/ShortCircuitMasterConnection.java |   8 ++
 .../hbase/security/access/AccessControlClient.java |  67 ++++------
 .../hbase/security/access/AccessControlUtil.java   |  12 ++
 .../security/access/GetUserPermissionsRequest.java | 136 +++++++++++++++++++++
 .../security/access/ShadedAccessControlUtil.java   |  38 ++++++
 .../src/main/protobuf/AccessControl.proto          |   3 +
 .../src/main/protobuf/Master.proto                 |   2 +
 .../hadoop/hbase/coprocessor/MasterObserver.java   |  30 +++++
 .../hadoop/hbase/master/MasterCoprocessorHost.java |  20 +++
 .../hadoop/hbase/master/MasterRpcServices.java     |  57 +++++++++
 .../hbase/security/access/AccessControlLists.java  |  13 +-
 .../hbase/security/access/AccessController.java    | 100 ++++++---------
 .../client/TestAsyncAccessControlAdminApi.java     |  74 +++++++++++
 .../hbase/security/access/SecureTestUtil.java      |   6 +-
 .../security/access/TestAccessController.java      | 124 +++++--------------
 .../security/access/TestNamespaceCommands.java     |  15 ++-
 .../hadoop/hbase/thrift2/client/ThriftAdmin.java   |   7 ++
 23 files changed, 570 insertions(+), 215 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index b27e93d..33b44c3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
@@ -2044,4 +2045,14 @@ public interface Admin extends Abortable, Closeable {
    * @throws IOException if a remote or network exception occurs
    */
   void revoke(UserPermission userPermission) throws IOException;
+
+  /**
+   * Get the global/namespace/table permissions for user
+   * @param getUserPermissionsRequest A request contains which user, global, namespace or table
+   *          permissions needed
+   * @return The user and permission list
+   * @throws IOException if a remote or network exception occurs
+   */
+  List<UserPermission> getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest)
+      throws IOException;
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index f7adc16..3227f22 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotView;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -1447,4 +1448,13 @@ public interface AsyncAdmin {
    * @param userPermission user name and the specific permission
    */
   CompletableFuture<Void> revoke(UserPermission userPermission);
+
+  /**
+   * Get the global/namespace/table permissions for user
+   * @param getUserPermissionsRequest A request contains which user, global, namespace or table
+   *          permissions needed
+   * @return The user and permission list
+   */
+  CompletableFuture<List<UserPermission>>
+      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest);
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 0eceaad..07b4311 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -812,4 +813,10 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   public CompletableFuture<Void> revoke(UserPermission userPermission) {
     return wrap(rawAdmin.revoke(userPermission));
   }
+
+  @Override
+  public CompletableFuture<List<UserPermission>>
+      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
+    return wrap(rawAdmin.getUserPermissions(getUserPermissionsRequest));
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ff2ff2f..6bcb499 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -93,6 +93,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
@@ -1795,6 +1797,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
           AccessControlProtos.RevokeRequest request) throws ServiceException {
         return stub.revoke(controller, request);
       }
+
+      @Override
+      public GetUserPermissionsResponse getUserPermissions(RpcController controller,
+          GetUserPermissionsRequest request) throws ServiceException {
+        return stub.getUserPermissions(controller, request);
+      }
     };
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 91f9584..442fd81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -110,6 +111,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@@ -3839,6 +3841,24 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
+  public List<UserPermission>
+      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) throws IOException {
+    return executeCallable(
+      new MasterCallable<List<UserPermission>>(getConnection(), getRpcControllerFactory()) {
+        @Override
+        protected List<UserPermission> rpcCall() throws Exception {
+          AccessControlProtos.GetUserPermissionsRequest req =
+              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest);
+          AccessControlProtos.GetUserPermissionsResponse response =
+              this.master.getUserPermissions(getRpcController(), req);
+          return response.getUserPermissionList().stream()
+              .map(userPermission -> ShadedAccessControlUtil.toUserPermission(userPermission))
+              .collect(Collectors.toList());
+        }
+      });
+  }
+
+  @Override
   public void close() {
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 587c6e2..085fcf7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -106,6 +107,8 @@ import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
@@ -3782,4 +3785,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
           (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
         .call();
   }
+
+  @Override
+  public CompletableFuture<List<UserPermission>>
+      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
+    return this.<List<UserPermission>> newMasterCaller().action((controller,
+        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
+            List<UserPermission>> call(controller, stub,
+              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
+              (s, c, req, done) -> s.getUserPermissions(c, req, done),
+              resp -> resp.getUserPermissionList().stream()
+                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
+                .collect(Collectors.toList())))
+        .call();
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index 090bcf9..6e80a54 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -21,6 +21,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
@@ -688,4 +690,10 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
       throws ServiceException {
     return stub.revoke(controller, request);
   }
+
+  @Override
+  public GetUserPermissionsResponse getUserPermissions(RpcController controller,
+      GetUserPermissionsRequest request) throws ServiceException {
+    return stub.getUserPermissions(controller, request);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
index c467dfb..046761e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
@@ -249,36 +249,27 @@ public class AccessControlClient {
    */
   public static List<UserPermission> getUserPermissions(Connection connection, String tableRegex,
       String userName) throws Throwable {
-    /**
-     * TODO: Pass an rpcController HBaseRpcController controller = ((ClusterConnection)
-     * connection).getRpcControllerFactory().newController();
-     */
     List<UserPermission> permList = new ArrayList<>();
-    try (Table table = connection.getTable(ACL_TABLE_NAME)) {
-      try (Admin admin = connection.getAdmin()) {
-        CoprocessorRpcChannel service = table.coprocessorService(HConstants.EMPTY_START_ROW);
-        BlockingInterface protocol =
-            AccessControlProtos.AccessControlService.newBlockingStub(service);
-        List<TableDescriptor> htds = null;
-        if (tableRegex == null || tableRegex.isEmpty()) {
-          permList = AccessControlUtil.getUserPermissions(null, protocol, userName);
-        } else if (tableRegex.charAt(0) == '@') { // Namespaces
-          String namespaceRegex = tableRegex.substring(1);
-          for (NamespaceDescriptor nsds : admin.listNamespaceDescriptors()) { // Read out all
-                                                                              // namespaces
-            String namespace = nsds.getName();
-            if (namespace.matches(namespaceRegex)) { // Match the given namespace regex?
-              permList.addAll(AccessControlUtil.getUserPermissions(null, protocol,
-                Bytes.toBytes(namespace), userName));
-            }
-          }
-        } else { // Tables
-          htds = admin.listTableDescriptors(Pattern.compile(tableRegex), true);
-          for (TableDescriptor htd : htds) {
-            permList.addAll(AccessControlUtil.getUserPermissions(null, protocol, htd.getTableName(),
-              null, null, userName));
+    try (Admin admin = connection.getAdmin()) {
+      if (tableRegex == null || tableRegex.isEmpty()) {
+        permList = admin.getUserPermissions(
+          GetUserPermissionsRequest.newBuilder().withUserName(userName).build());
+      } else if (tableRegex.charAt(0) == '@') { // Namespaces
+        String namespaceRegex = tableRegex.substring(1);
+        for (NamespaceDescriptor nsds : admin.listNamespaceDescriptors()) { // Read out all
+                                                                            // namespaces
+          String namespace = nsds.getName();
+          if (namespace.matches(namespaceRegex)) { // Match the given namespace regex?
+            permList.addAll(admin.getUserPermissions(
+              GetUserPermissionsRequest.newBuilder(namespace).withUserName(userName).build()));
           }
         }
+      } else { // Tables
+        List<TableDescriptor> htds = admin.listTableDescriptors(Pattern.compile(tableRegex), true);
+        for (TableDescriptor htd : htds) {
+          permList.addAll(admin.getUserPermissions(GetUserPermissionsRequest
+              .newBuilder(htd.getTableName()).withUserName(userName).build()));
+        }
       }
     }
     return permList;
@@ -347,22 +338,14 @@ public class AccessControlClient {
     if (tableRegex == null || tableRegex.isEmpty() || tableRegex.charAt(0) == '@') {
       throw new IllegalArgumentException("Table name can't be null or empty or a namespace.");
     }
-    /**
-     * TODO: Pass an rpcController HBaseRpcController controller = ((ClusterConnection)
-     * connection).getRpcControllerFactory().newController();
-     */
     List<UserPermission> permList = new ArrayList<UserPermission>();
-    try (Table table = connection.getTable(ACL_TABLE_NAME)) {
-      try (Admin admin = connection.getAdmin()) {
-        CoprocessorRpcChannel service = table.coprocessorService(HConstants.EMPTY_START_ROW);
-        BlockingInterface protocol =
-            AccessControlProtos.AccessControlService.newBlockingStub(service);
-        List<TableDescriptor> htds = admin.listTableDescriptors(Pattern.compile(tableRegex), true);
-        // Retrieve table permissions
-        for (TableDescriptor htd : htds) {
-          permList.addAll(AccessControlUtil.getUserPermissions(null, protocol, htd.getTableName(),
-            columnFamily, columnQualifier, userName));
-        }
+    try (Admin admin = connection.getAdmin()) {
+      List<TableDescriptor> htds = admin.listTableDescriptors(Pattern.compile(tableRegex), true);
+      // Retrieve table permissions
+      for (TableDescriptor htd : htds) {
+        permList.addAll(admin.getUserPermissions(
+          GetUserPermissionsRequest.newBuilder(htd.getTableName()).withFamily(columnFamily)
+              .withQualifier(columnQualifier).withUserName(userName).build()));
       }
     }
     return permList;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
index 05f173e..0220d89 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
@@ -654,7 +654,9 @@ public class AccessControlUtil {
    * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @throws ServiceException on failure
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
    */
+  @Deprecated
   public static List<UserPermission> getUserPermissions(RpcController controller,
       AccessControlService.BlockingInterface protocol) throws ServiceException {
     return getUserPermissions(controller, protocol, HConstants.EMPTY_STRING);
@@ -666,7 +668,9 @@ public class AccessControlUtil {
    * @param protocol the AccessControlService protocol proxy
    * @param userName User name, if empty then all user permissions will be retrieved.
    * @throws ServiceException
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
    */
+  @Deprecated
   public static List<UserPermission> getUserPermissions(RpcController controller,
       AccessControlService.BlockingInterface protocol, String userName) throws ServiceException {
     AccessControlProtos.GetUserPermissionsRequest.Builder builder =
@@ -695,7 +699,9 @@ public class AccessControlUtil {
    * @param protocol the AccessControlService protocol proxy
    * @param t optional table name
    * @throws ServiceException
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
    */
+  @Deprecated
   public static List<UserPermission> getUserPermissions(RpcController controller,
       AccessControlService.BlockingInterface protocol,
       TableName t) throws ServiceException {
@@ -712,7 +718,9 @@ public class AccessControlUtil {
    * @param columnQualifier Column qualifier
    * @param userName User name, if empty then all user permissions will be retrieved.
    * @throws ServiceException
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
    */
+  @Deprecated
   public static List<UserPermission> getUserPermissions(RpcController controller,
       AccessControlService.BlockingInterface protocol, TableName t, byte[] columnFamily,
       byte[] columnQualifier, String userName) throws ServiceException {
@@ -751,7 +759,9 @@ public class AccessControlUtil {
    * @param protocol the AccessControlService protocol proxy
    * @param namespace name of the namespace
    * @throws ServiceException
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
    */
+  @Deprecated
   public static List<UserPermission> getUserPermissions(RpcController controller,
       AccessControlService.BlockingInterface protocol,
       byte[] namespace) throws ServiceException {
@@ -765,7 +775,9 @@ public class AccessControlUtil {
    * @param namespace name of the namespace
    * @param userName User name, if empty then all user permissions will be retrieved.
    * @throws ServiceException
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
    */
+  @Deprecated
   public static List<UserPermission> getUserPermissions(RpcController controller,
       AccessControlService.BlockingInterface protocol, byte[] namespace, String userName)
       throws ServiceException {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GetUserPermissionsRequest.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GetUserPermissionsRequest.java
new file mode 100644
index 0000000..8e1767c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GetUserPermissionsRequest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.util.Objects;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used by
+ * {@link org.apache.hadoop.hbase.client.Admin#getUserPermissions(GetUserPermissionsRequest)}.
+ * Represents the params of user permissions needed to get from HBase.
+ */
+@InterfaceAudience.Public
+public final class GetUserPermissionsRequest {
+  private String userName;
+  private String namespace;
+  private TableName tableName;
+  private byte[] family;
+  private byte[] qualifier;
+
+  private GetUserPermissionsRequest(String userName, String namespace, TableName tableName,
+      byte[] family, byte[] qualifier) {
+    this.userName = userName;
+    this.namespace = namespace;
+    this.tableName = tableName;
+    this.family = family;
+    this.qualifier = qualifier;
+  }
+
+  /**
+   * Build a get global permission request
+   * @return a get global permission request builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Build a get namespace permission request
+   * @param namespace the specific namespace
+   * @return a get namespace permission request builder
+   */
+  public static Builder newBuilder(String namespace) {
+    return new Builder(namespace);
+  }
+
+  /**
+   * Build a get table permission request
+   * @param tableName the specific table name
+   * @return a get table permission request builder
+   */
+  public static Builder newBuilder(TableName tableName) {
+    return new Builder(tableName);
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getNamespace() {
+    return namespace;
+  }
+
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  public byte[] getFamily() {
+    return family;
+  }
+
+  public byte[] getQualifier() {
+    return qualifier;
+  }
+
+  public static final class Builder {
+    private String userName;
+    private String namespace;
+    private TableName tableName;
+    private byte[] family;
+    private byte[] qualifier;
+
+    private Builder() {
+    }
+
+    private Builder(String namespace) {
+      this.namespace = namespace;
+    }
+
+    private Builder(TableName tableName) {
+      this.tableName = tableName;
+    }
+
+    /**
+     * user name could be null if need all global/namespace/table permissions
+     */
+    public Builder withUserName(String userName) {
+      this.userName = userName;
+      return this;
+    }
+
+    public Builder withFamily(byte[] family) {
+      Objects.requireNonNull(tableName, "The tableName can't be NULL");
+      this.family = family;
+      return this;
+    }
+
+    public Builder withQualifier(byte[] qualifier) {
+      Objects.requireNonNull(tableName, "The tableName can't be NULL");
+      // Objects.requireNonNull(family, "The family can't be NULL");
+      this.qualifier = qualifier;
+      return this;
+    }
+
+    public GetUserPermissionsRequest build() {
+      return new GetUserPermissionsRequest(userName, namespace, tableName, family, qualifier);
+    }
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
index b354d87..be3b75e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
@@ -29,7 +29,9 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
@@ -280,4 +282,40 @@ public class ShadedAccessControlUtil {
   public static RevokeRequest buildRevokeRequest(UserPermission userPermission) {
     return RevokeRequest.newBuilder().setUserPermission(toUserPermission(userPermission)).build();
   }
+
+  public static AccessControlProtos.GetUserPermissionsRequest
+      buildGetUserPermissionsRequest(GetUserPermissionsRequest request) {
+    AccessControlProtos.GetUserPermissionsRequest.Builder builder =
+        AccessControlProtos.GetUserPermissionsRequest.newBuilder();
+    if (request.getUserName() != null && !request.getUserName().isEmpty()) {
+      builder.setUserName(ByteString.copyFromUtf8(request.getUserName()));
+    }
+    if (request.getNamespace() != null && !request.getNamespace().isEmpty()) {
+      builder.setNamespaceName(ByteString.copyFromUtf8(request.getNamespace()));
+      builder.setType(Type.Namespace);
+    }
+    if (request.getTableName() != null) {
+      builder.setTableName(toProtoTableName(request.getTableName()));
+      builder.setType(Type.Table);
+    }
+    if (!builder.hasType()) {
+      builder.setType(Type.Global);
+    }
+    if (request.getFamily() != null && request.getFamily().length > 0) {
+      builder.setColumnFamily(ByteString.copyFrom(request.getFamily()));
+    }
+    if (request.getQualifier() != null && request.getQualifier().length > 0) {
+      builder.setColumnQualifier(ByteString.copyFrom(request.getQualifier()));
+    }
+    return builder.build();
+  }
+
+  public static GetUserPermissionsResponse
+      buildGetUserPermissionsResponse(final List<UserPermission> permissions) {
+    GetUserPermissionsResponse.Builder builder = GetUserPermissionsResponse.newBuilder();
+    for (UserPermission perm : permissions) {
+      builder.addUserPermission(toUserPermission(perm));
+    }
+    return builder.build();
+  }
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/AccessControl.proto b/hbase-protocol-shaded/src/main/protobuf/AccessControl.proto
index 3b0e9bb..af60fe2 100644
--- a/hbase-protocol-shaded/src/main/protobuf/AccessControl.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/AccessControl.proto
@@ -103,6 +103,9 @@ message GetUserPermissionsRequest {
   optional Permission.Type type = 1;
   optional TableName table_name = 2;
   optional bytes namespace_name = 3;
+  optional bytes column_family = 4;
+  optional bytes column_qualifier = 5;
+  optional bytes user_name = 6;
 }
 
 message GetUserPermissionsResponse {
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 4ed0ad5..d883b4c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -1031,6 +1031,8 @@ service MasterService {
   rpc Grant(GrantRequest) returns (GrantResponse);
 
   rpc Revoke(RevokeRequest) returns (RevokeResponse);
+
+  rpc GetUserPermissions (GetUserPermissionsRequest) returns (GetUserPermissionsResponse);
 }
 
 // HBCK Service definitions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index d8b7d4d..bfb1ada 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1632,4 +1632,34 @@ public interface MasterObserver {
   default void postRevoke(ObserverContext<MasterCoprocessorEnvironment> ctx,
       UserPermission userPermission) throws IOException {
   }
+
+  /**
+   * Called before getting user permissions.
+   * @param ctx the coprocessor instance's environment
+   * @param userName the user name, null if get all user permissions
+   * @param namespace the namespace, null if don't get namespace permission
+   * @param tableName the table name, null if don't get table permission
+   * @param family the table column family, null if don't get table family permission
+   * @param qualifier the table column qualifier, null if don't get table qualifier permission
+   * @throws IOException if something went wrong
+   */
+  default void preGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
+      throws IOException {
+  }
+
+  /**
+   * Called after getting user permissions.
+   * @param ctx the coprocessor instance's environment
+   * @param userName the user name, null if get all user permissions
+   * @param namespace the namespace, null if don't get namespace permission
+   * @param tableName the table name, null if don't get table permission
+   * @param family the table column family, null if don't get table family permission
+   * @param qualifier the table column qualifier, null if don't get table qualifier permission
+   * @throws IOException if something went wrong
+   */
+  default void postGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
+      throws IOException {
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 8764143..bb9fc3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -1892,4 +1892,24 @@ public class MasterCoprocessorHost
       }
     });
   }
+
+  public void preGetUserPermissions(String userName, String namespace, TableName tableName,
+      byte[] family, byte[] qualifier) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preGetUserPermissions(this, userName, namespace, tableName, family, qualifier);
+      }
+    });
+  }
+
+  public void postGetUserPermissions(String userName, String namespace, TableName tableName,
+      byte[] family, byte[] qualifier) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postGetUserPermissions(this, userName, namespace, tableName, family, qualifier);
+      }
+    });
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index b943000..08f89db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -91,11 +91,13 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.security.access.AccessController;
 import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
@@ -118,8 +120,12 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -2571,6 +2577,57 @@ public class MasterRpcServices extends RSRpcServices
     }
   }
 
+  @Override
+  public GetUserPermissionsResponse getUserPermissions(RpcController controller,
+      GetUserPermissionsRequest request) throws ServiceException {
+    try {
+      final String userName = request.hasUserName() ? request.getUserName().toStringUtf8() : null;
+      String namespace =
+          request.hasNamespaceName() ? request.getNamespaceName().toStringUtf8() : null;
+      TableName table =
+          request.hasTableName() ? ProtobufUtil.toTableName(request.getTableName()) : null;
+      byte[] cf = request.hasColumnFamily() ? request.getColumnFamily().toByteArray() : null;
+      byte[] cq = request.hasColumnQualifier() ? request.getColumnQualifier().toByteArray() : null;
+      Type permissionType = request.hasType() ? request.getType() : null;
+      if (master.cpHost != null) {
+        master.getMasterCoprocessorHost().preGetUserPermissions(userName, namespace, table, cf, cq);
+      }
+
+      List<UserPermission> perms = null;
+      if (permissionType == Type.Table) {
+        boolean filter = (cf != null || userName != null) ? true : false;
+        perms = AccessControlLists.getUserTablePermissions(master.getConfiguration(), table, cf, cq,
+          userName, filter);
+      } else if (permissionType == Type.Namespace) {
+        perms = AccessControlLists.getUserNamespacePermissions(master.getConfiguration(), namespace,
+          userName, userName != null ? true : false);
+      } else {
+        perms = AccessControlLists.getUserPermissions(master.getConfiguration(), null, null, null,
+          userName, userName != null ? true : false);
+        // Skip super users when filter user is specified
+        if (userName == null) {
+          // Adding superusers explicitly to the result set as AccessControlLists do not store
+          // them. Also using acl as table name to be inline with the results of global admin and
+          // will help in avoiding any leakage of information about being superusers.
+          for (String user : Superusers.getSuperUsers()) {
+            perms.add(new UserPermission(user,
+                Permission.newBuilder().withActions(Action.values()).build()));
+          }
+        }
+      }
+
+      if (master.cpHost != null) {
+        master.getMasterCoprocessorHost().postGetUserPermissions(userName, namespace, table, cf,
+          cq);
+      }
+      AccessControlProtos.GetUserPermissionsResponse response =
+          ShadedAccessControlUtil.buildGetUserPermissionsResponse(perms);
+      return response;
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
   private boolean containMetaWals(ServerName serverName) throws IOException {
     Path logDir = new Path(master.getWALRootDir(),
         AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
index 74c848d..1182baa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
@@ -538,8 +538,9 @@ public class AccessControlLists {
    * Returns the currently granted permissions for a given table as the specified user plus
    * associated permissions.
    */
-  static List<UserPermission> getUserTablePermissions(Configuration conf, TableName tableName,
-      byte[] cf, byte[] cq, String userName, boolean hasFilterUser) throws IOException {
+  public static List<UserPermission> getUserTablePermissions(Configuration conf,
+      TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
+      throws IOException {
     return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
       userName, hasFilterUser);
   }
@@ -548,8 +549,8 @@ public class AccessControlLists {
    * Returns the currently granted permissions for a given namespace as the specified user plus
    * associated permissions.
    */
-  static List<UserPermission> getUserNamespacePermissions(Configuration conf, String namespace,
-      String user, boolean hasFilterUser) throws IOException {
+  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
+      String namespace, String user, boolean hasFilterUser) throws IOException {
     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
       hasFilterUser);
   }
@@ -566,8 +567,8 @@ public class AccessControlLists {
    * @return List of UserPermissions
    * @throws IOException on failure
    */
-  static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName, byte[] cf,
-      byte[] cq, String user, boolean hasFilterUser) throws IOException {
+  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
+      byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
     ListMultimap<String, UserPermission> allPerms =
         getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
     List<UserPermission> perms = new ArrayList<>();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index a4a96c0..aaa2da5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -2135,6 +2135,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
     done.run(response);
   }
 
+  /**
+   * @deprecated Use {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
+   */
+  @Deprecated
   @Override
   public void getUserPermissions(RpcController controller,
       AccessControlProtos.GetUserPermissionsRequest request,
@@ -2147,77 +2151,29 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
           throw new CoprocessorException("AccessController not yet initialized");
         }
         User caller = RpcServer.getRequestUser().orElse(null);
-
-        List<UserPermission> perms = null;
-        // Initialize username, cf and cq. Set to null if request doesn't have.
         final String userName = request.hasUserName() ? request.getUserName().toStringUtf8() : null;
+        final String namespace =
+            request.hasNamespaceName() ? request.getNamespaceName().toStringUtf8() : null;
+        final TableName table =
+            request.hasTableName() ? ProtobufUtil.toTableName(request.getTableName()) : null;
         final byte[] cf =
             request.hasColumnFamily() ? request.getColumnFamily().toByteArray() : null;
         final byte[] cq =
             request.hasColumnQualifier() ? request.getColumnQualifier().toByteArray() : null;
-
+        preGetUserPermissions(caller, userName, namespace, table, cf, cq);
+        GetUserPermissionsRequest getUserPermissionsRequest = null;
         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
-          final TableName table = request.hasTableName() ?
-            ProtobufUtil.toTableName(request.getTableName()) : null;
-          accessChecker.requirePermission(caller, "userPermissions", table, cf, cq, userName,
-            Action.ADMIN);
-          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
-            @Override
-            public List<UserPermission> run() throws Exception {
-              if (cf != null || userName != null) {
-                // retrieve permission based on the requested parameters
-                return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(),
-                  table, cf, cq, userName, true);
-              } else {
-                return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(),
-                  table, null, null, null, false);
-              }
-            }
-          });
+          getUserPermissionsRequest = GetUserPermissionsRequest.newBuilder(table).withFamily(cf)
+              .withQualifier(cq).withUserName(userName).build();
         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
-          final String namespace = request.getNamespaceName().toStringUtf8();
-          accessChecker.requireNamespacePermission(caller, "userPermissions",
-            namespace, userName, Action.ADMIN);
-          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
-            @Override
-            public List<UserPermission> run() throws Exception {
-              if (userName != null) {
-                // retrieve permission based on the requested parameters
-                return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
-                  namespace, userName, true);
-              } else {
-                return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
-                  namespace, null, false);
-              }
-            }
-          });
+          getUserPermissionsRequest =
+              GetUserPermissionsRequest.newBuilder(namespace).withUserName(userName).build();
         } else {
-          accessChecker.requirePermission(caller, "userPermissions", userName, Action.ADMIN);
-          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
-            @Override
-            public List<UserPermission> run() throws Exception {
-              if (userName != null) {
-                // retrieve permission based on the requested parameters
-                return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null,
-                  null, null, userName, true);
-              } else {
-                return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null,
-                  null, null, null, false);
-              }
-            }
-          });
-
-          // Skip super users when filter user is specified
-          if (userName == null) {
-            // Adding superusers explicitly to the result set as AccessControlLists do not store
-            // them. Also using acl as table name to be inline with the results of global admin and
-            // will help in avoiding any leakage of information about being superusers.
-            for (String user : Superusers.getSuperUsers()) {
-              perms.add(new UserPermission(user,
-                  Permission.newBuilder().withActions(Action.values()).build()));
-            }
-          }
+          getUserPermissionsRequest =
+              GetUserPermissionsRequest.newBuilder().withUserName(userName).build();
         }
+        List<UserPermission> perms =
+            regionEnv.getConnection().getAdmin().getUserPermissions(getUserPermissionsRequest);
         response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
       } else {
         throw new CoprocessorException(AccessController.class, "This method "
@@ -2680,4 +2636,24 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
       accessChecker.performOnSuperuser(request, caller, userPermission.getUser());
     }
   }
+
+  @Override
+  public void preGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
+      throws IOException {
+    preGetUserPermissions(getActiveUser(ctx), userName, namespace, tableName, family, qualifier);
+  }
+
+  private void preGetUserPermissions(User caller, String userName, String namespace,
+      TableName tableName, byte[] family, byte[] qualifier) throws IOException {
+    if (tableName != null) {
+      accessChecker.requirePermission(caller, "getUserPermissions", tableName, family, qualifier,
+        userName, Action.ADMIN);
+    } else if (namespace != null) {
+      accessChecker.requireNamespacePermission(caller, "getUserPermissions", namespace, userName,
+        Action.ADMIN);
+    } else {
+      accessChecker.requirePermission(caller, "getUserPermissions", userName, Action.ADMIN);
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAccessControlAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAccessControlAdminApi.java
new file mode 100644
index 0000000..8075ae0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAccessControlAdminApi.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category({ ClientTests.class, SmallTests.class })
+public class TestAsyncAccessControlAdminApi extends TestAsyncAdminBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestAsyncAccessControlAdminApi.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  @Test
+  public void testGrant() throws Exception {
+    TableName tableName = TableName.valueOf("test-table");
+    String user = "test-user";
+    UserPermission userPermission = new UserPermission(user,
+        Permission.newBuilder(tableName).withActions(Permission.Action.READ).build());
+    // grant user table permission
+    admin.grant(userPermission, false).get();
+
+    // get table permissions
+    List<UserPermission> userPermissions =
+        admin.getUserPermissions(GetUserPermissionsRequest.newBuilder(tableName).build()).get();
+    assertEquals(1, userPermissions.size());
+    assertEquals(userPermission, userPermissions.get(0));
+
+    // get user table permissions
+    userPermissions = admin.getUserPermissions(
+      GetUserPermissionsRequest.newBuilder(tableName).withUserName(user).build()).get();
+    assertEquals(1, userPermissions.size());
+    assertEquals(userPermission, userPermissions.get(0));
+
+    userPermissions = admin.getUserPermissions(
+      GetUserPermissionsRequest.newBuilder(tableName).withUserName("u").build()).get();
+    assertEquals(0, userPermissions.size());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index 129f2b6..eecd773 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -144,7 +144,8 @@ public class SecureTestUtil {
       Permission.Action... actions) throws IOException {
     Permission[] perms = new Permission[actions.length];
     for (int i = 0; i < actions.length; i++) {
-      perms[i] = new TablePermission(table, family, column, actions[i]);
+      perms[i] = Permission.newBuilder(table).withFamily(family).withQualifier(column)
+          .withActions(actions[i]).build();
     }
 
     checkTablePerms(conf, table, perms);
@@ -878,7 +879,8 @@ public class SecureTestUtil {
       byte[] column, Permission.Action... actions) throws IOException {
     Permission[] perms = new Permission[actions.length];
     for (int i = 0; i < actions.length; i++) {
-      perms[i] = new TablePermission(table, family, column, actions[i]);
+      perms[i] = Permission.newBuilder(table).withFamily(family).withQualifier(column)
+          .withActions(actions[i]).build();
     }
     checkTablePerms(testUtil, table, perms);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 0d7a0e0..563238c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -1193,12 +1193,9 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction getTablePermissionsAction = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        try(Connection conn = ConnectionFactory.createConnection(conf);
-            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)){
-          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
-          AccessControlService.BlockingInterface protocol =
-              AccessControlService.newBlockingStub(service);
-          AccessControlUtil.getUserPermissions(null, protocol, TEST_TABLE);
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+          conn.getAdmin()
+              .getUserPermissions(GetUserPermissionsRequest.newBuilder(TEST_TABLE).build());
         }
         return null;
       }
@@ -1207,12 +1204,9 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction getGlobalPermissionsAction = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        try(Connection conn = ConnectionFactory.createConnection(conf);
+        try (Connection conn = ConnectionFactory.createConnection(conf);
             Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
-          AccessControlService.BlockingInterface protocol =
-            AccessControlService.newBlockingStub(service);
-          AccessControlUtil.getUserPermissions(null, protocol);
+          conn.getAdmin().getUserPermissions(GetUserPermissionsRequest.newBuilder().build());
         }
         return null;
       }
@@ -1682,17 +1676,8 @@ public class TestAccessController extends SecureTestUtil {
     htd.setOwner(USER_OWNER);
     createTable(TEST_UTIL, htd);
     try {
-      List<UserPermission> perms;
-      Table acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
-      try {
-        BlockingRpcChannel service = acl.coprocessorService(tableName.getName());
-        AccessControlService.BlockingInterface protocol =
-            AccessControlService.newBlockingStub(service);
-        perms = AccessControlUtil.getUserPermissions(null, protocol, tableName);
-      } finally {
-        acl.close();
-      }
-
+      List<UserPermission> perms =
+          admin.getUserPermissions(GetUserPermissionsRequest.newBuilder(tableName).build());
       UserPermission ownerperm = new UserPermission(USER_OWNER.getName(),
           Permission.newBuilder(tableName).withActions(Action.values()).build());
       assertTrue("Owner should have all permissions on table",
@@ -1711,16 +1696,7 @@ public class TestAccessController extends SecureTestUtil {
       grantOnTable(TEST_UTIL, user.getShortName(), tableName, family1, qualifier,
         Permission.Action.READ);
 
-      acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
-      try {
-        BlockingRpcChannel service = acl.coprocessorService(tableName.getName());
-        AccessControlService.BlockingInterface protocol =
-            AccessControlService.newBlockingStub(service);
-        perms = AccessControlUtil.getUserPermissions(null, protocol, tableName);
-      } finally {
-        acl.close();
-      }
-
+      perms = admin.getUserPermissions(GetUserPermissionsRequest.newBuilder(tableName).build());
       UserPermission upToVerify =
           new UserPermission(userName, Permission.newBuilder(tableName).withFamily(family1)
               .withQualifier(qualifier).withActions(Permission.Action.READ).build());
@@ -1736,16 +1712,7 @@ public class TestAccessController extends SecureTestUtil {
       grantOnTable(TEST_UTIL, user.getShortName(), tableName, family1, qualifier,
         Permission.Action.WRITE, Permission.Action.READ);
 
-      acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
-      try {
-        BlockingRpcChannel service = acl.coprocessorService(tableName.getName());
-        AccessControlService.BlockingInterface protocol =
-            AccessControlService.newBlockingStub(service);
-        perms = AccessControlUtil.getUserPermissions(null, protocol, tableName);
-      } finally {
-        acl.close();
-      }
-
+      perms = admin.getUserPermissions(GetUserPermissionsRequest.newBuilder(tableName).build());
       upToVerify = new UserPermission(userName,
           Permission.newBuilder(tableName).withFamily(family1).withQualifier(qualifier)
               .withActions(Permission.Action.WRITE, Permission.Action.READ).build());
@@ -1756,16 +1723,7 @@ public class TestAccessController extends SecureTestUtil {
       revokeFromTable(TEST_UTIL, user.getShortName(), tableName, family1, qualifier,
         Permission.Action.WRITE, Permission.Action.READ);
 
-      acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
-      try {
-        BlockingRpcChannel service = acl.coprocessorService(tableName.getName());
-        AccessControlService.BlockingInterface protocol =
-            AccessControlService.newBlockingStub(service);
-        perms = AccessControlUtil.getUserPermissions(null, protocol, tableName);
-      } finally {
-        acl.close();
-      }
-
+      perms = admin.getUserPermissions(GetUserPermissionsRequest.newBuilder(tableName).build());
       assertFalse("User should not be granted permission: " + upToVerify.toString(),
         hasFoundUserPermission(upToVerify, perms));
 
@@ -1776,16 +1734,7 @@ public class TestAccessController extends SecureTestUtil {
       htd.setOwner(newOwner);
       admin.modifyTable(htd);
 
-      acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
-      try {
-        BlockingRpcChannel service = acl.coprocessorService(tableName.getName());
-        AccessControlService.BlockingInterface protocol =
-            AccessControlService.newBlockingStub(service);
-        perms = AccessControlUtil.getUserPermissions(null, protocol, tableName);
-      } finally {
-        acl.close();
-      }
-
+      perms = admin.getUserPermissions(GetUserPermissionsRequest.newBuilder(tableName).build());
       UserPermission newOwnerperm = new UserPermission(newOwner.getName(),
           Permission.newBuilder(tableName).withActions(Action.values()).build());
       assertTrue("New owner should have all permissions on table",
@@ -1798,16 +1747,8 @@ public class TestAccessController extends SecureTestUtil {
 
   @Test
   public void testGlobalPermissionList() throws Exception {
-    List<UserPermission> perms;
-    Table acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
-    try {
-      BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
-      AccessControlService.BlockingInterface protocol =
-        AccessControlService.newBlockingStub(service);
-      perms = AccessControlUtil.getUserPermissions(null, protocol);
-    } finally {
-      acl.close();
-    }
+    List<UserPermission> perms = systemUserConnection.getAdmin()
+        .getUserPermissions(GetUserPermissionsRequest.newBuilder().build());
 
     Collection<String> superUsers = Superusers.getSuperUsers();
     List<UserPermission> adminPerms = new ArrayList<>(superUsers.size() + 1);
@@ -1903,9 +1844,12 @@ public class TestAccessController extends SecureTestUtil {
       AccessTestAction multiQualifierRead = new AccessTestAction() {
         @Override
         public Void run() throws Exception {
-          checkTablePerms(TEST_UTIL, TEST_TABLE, new Permission[] {
-              new TablePermission(TEST_TABLE, TEST_FAMILY, TEST_Q1, Permission.Action.READ),
-              new TablePermission(TEST_TABLE, TEST_FAMILY, TEST_Q2, Permission.Action.READ), });
+          checkTablePerms(TEST_UTIL, TEST_TABLE,
+            new Permission[] {
+                Permission.newBuilder(TEST_TABLE).withFamily(TEST_FAMILY).withQualifier(TEST_Q1)
+                    .withActions(Permission.Action.READ).build(),
+                Permission.newBuilder(TEST_TABLE).withFamily(TEST_FAMILY).withQualifier(TEST_Q2)
+                    .withActions(Permission.Action.READ).build(), });
           return null;
         }
       };
@@ -1915,7 +1859,7 @@ public class TestAccessController extends SecureTestUtil {
         public Void run() throws Exception {
           checkTablePerms(TEST_UTIL, TEST_TABLE, new Permission[] {
               new Permission(Permission.Action.READ),
-              new TablePermission(TEST_TABLE, null, (byte[]) null, Permission.Action.READ), });
+              Permission.newBuilder(TEST_TABLE).withActions(Permission.Action.READ).build() });
           return null;
         }
       };
@@ -3545,12 +3489,9 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction globalUserPermissionAction = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        try (Connection conn = ConnectionFactory.createConnection(conf);
-            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
-          AccessControlService.BlockingInterface protocol =
-              AccessControlService.newBlockingStub(service);
-          AccessControlUtil.getUserPermissions(null, protocol, "dummy");
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+          conn.getAdmin().getUserPermissions(
+            GetUserPermissionsRequest.newBuilder().withUserName("dummy").build());
         }
         return null;
       }
@@ -3584,12 +3525,9 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction namespaceUserPermissionAction = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        try (Connection conn = ConnectionFactory.createConnection(conf);
-            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
-          AccessControlService.BlockingInterface protocol =
-              AccessControlService.newBlockingStub(service);
-          AccessControlUtil.getUserPermissions(null, protocol, Bytes.toBytes(namespace1), "dummy");
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+          conn.getAdmin().getUserPermissions(
+            GetUserPermissionsRequest.newBuilder(namespace1).withUserName("dummy").build());
         }
         return null;
       }
@@ -3628,13 +3566,9 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction tableUserPermissionAction = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        try (Connection conn = ConnectionFactory.createConnection(conf);
-            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
-          AccessControlService.BlockingInterface protocol =
-              AccessControlService.newBlockingStub(service);
-          AccessControlUtil.getUserPermissions(null, protocol, TEST_TABLE, TEST_FAMILY,
-            TEST_QUALIFIER, "dummy");
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+          conn.getAdmin().getUserPermissions(GetUserPermissionsRequest.newBuilder(TEST_TABLE)
+              .withFamily(TEST_FAMILY).withQualifier(TEST_QUALIFIER).withUserName("dummy").build());
         }
         return null;
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
index 82d0f6e..f6b018e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
@@ -406,12 +406,9 @@ public class TestNamespaceCommands extends SecureTestUtil {
     AccessTestAction getPermissionsAction = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        try (Connection connection = ConnectionFactory.createConnection(conf);
-            Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
-          AccessControlService.BlockingInterface protocol =
-              AccessControlService.newBlockingStub(service);
-          AccessControlUtil.getUserPermissions(null, protocol, Bytes.toBytes(TEST_NAMESPACE));
+        try (Connection connection = ConnectionFactory.createConnection(conf)) {
+          connection.getAdmin()
+              .getUserPermissions(GetUserPermissionsRequest.newBuilder(TEST_NAMESPACE).build());
         }
         return null;
       }
@@ -421,7 +418,8 @@ public class TestNamespaceCommands extends SecureTestUtil {
       @Override
       public Object run() throws Exception {
         ACCESS_CONTROLLER.preGrant(ObserverContextImpl.createAndPrepare(CP_ENV),
-          new UserPermission(testUser, new NamespacePermission(TEST_NAMESPACE, Action.WRITE)),
+          new UserPermission(testUser,
+              Permission.newBuilder(TEST_NAMESPACE).withActions(Action.WRITE).build()),
           false);
         return null;
       }
@@ -430,7 +428,8 @@ public class TestNamespaceCommands extends SecureTestUtil {
       @Override
       public Object run() throws Exception {
         ACCESS_CONTROLLER.preRevoke(ObserverContextImpl.createAndPrepare(CP_ENV),
-          new UserPermission(testUser, new NamespacePermission(TEST_NAMESPACE, Action.WRITE)));
+          new UserPermission(testUser,
+              Permission.newBuilder(TEST_NAMESPACE).withActions(Action.WRITE).build()));
         return null;
       }
     };
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 85adf7d..7b96a28 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
 import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
@@ -1104,4 +1105,10 @@ public class ThriftAdmin implements Admin {
   public void revoke(UserPermission userPermission) {
     throw new NotImplementedException("revoke not supported in ThriftAdmin");
   }
+
+  @Override
+  public List<UserPermission>
+      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
+    throw new NotImplementedException("getUserPermissions not supported in ThriftAdmin");
+  }
 }