You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/25 04:42:54 UTC

[iotdb] branch master updated: [IOTDB-3083] DataNode AuthorInfo cache (#5943)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c115adde55 [IOTDB-3083] DataNode AuthorInfo cache (#5943)
c115adde55 is described below

commit c115adde559c04e60bd0ac158afc982519c9d30a
Author: 任宇华 <79...@users.noreply.github.com>
AuthorDate: Wed May 25 12:42:47 2022 +0800

    [IOTDB-3083] DataNode AuthorInfo cache (#5943)
---
 .../iotdb/confignode/manager/ConfigManager.java    |  14 +-
 .../apache/iotdb/confignode/manager/Manager.java   |   5 +-
 .../confignode/manager/PermissionManager.java      |  79 ++++-
 .../iotdb/confignode/persistence/AuthorInfo.java   |  96 +++++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   5 +-
 .../confignode/persistence/AuthorInfoTest.java     |  10 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |   4 +-
 .../Administration-Management/Administration.md    |   6 +-
 .../Administration-Management/Administration.md    |   6 +-
 pom.xml                                            |   7 +
 .../resources/conf/iotdb-engine.properties         |   8 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  54 +---
 .../AuthorityFetcher.java}                         | 100 +++----
 .../apache/iotdb/db/auth/AuthorizerManager.java    | 330 +++++++++++++++++----
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  13 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  22 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  14 +
 .../execution/config/AuthorizerConfigTask.java     | 107 +------
 .../thrift/impl/DataNodeTSIServiceImpl.java        |   3 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  11 +
 .../iotdb/db/auth/AuthorizerManagerTest.java       | 164 ++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../src/main/thrift/confignode.thrift              |  22 +-
 thrift/src/main/thrift/mpp.thrift                  |  12 +
 24 files changed, 790 insertions(+), 303 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 9b2001ee0a..34477bbd6c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
@@ -503,22 +504,27 @@ public class ConfigManager implements Manager {
   }
 
   @Override
-  public TSStatus login(String username, String password) {
+  public TPermissionInfoResp login(String username, String password) {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return permissionManager.login(username, password);
     } else {
-      return status;
+      TPermissionInfoResp permissionInfoResp = new TPermissionInfoResp();
+      permissionInfoResp.setStatus(status);
+      return permissionInfoResp;
     }
   }
 
   @Override
-  public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
+  public TPermissionInfoResp checkUserPrivileges(
+      String username, List<String> paths, int permission) {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return permissionManager.checkUserPrivileges(username, paths, permission);
     } else {
-      return status;
+      TPermissionInfoResp permissionInfoResp = new TPermissionInfoResp();
+      permissionInfoResp.setStatus(status);
+      return permissionInfoResp;
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index f496541603..16a07d050a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionInter
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 
@@ -199,10 +200,10 @@ public interface Manager {
   DataSet queryPermission(ConfigRequest configRequest);
 
   /** login */
-  TSStatus login(String username, String password);
+  TPermissionInfoResp login(String username, String password);
 
   /** Check User Privileges */
-  TSStatus checkUserPrivileges(String username, List<String> paths, int permission);
+  TPermissionInfoResp checkUserPrivileges(String username, List<String> paths, int permission);
 
   /**
    * Register ConfigNode when it is first startup
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 7cbae5a3d8..d27060c5f9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -19,20 +19,42 @@
 
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
 import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.util.List;
 
 /** manager permission query and operation */
 public class PermissionManager {
 
-  private final Manager configManager;
+  private static final Logger logger = LoggerFactory.getLogger(PermissionManager.class);
+
+  private final ConfigManager configManager;
   private final AuthorInfo authorInfo;
+  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      INTERNAL_SERVICE_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+              .createClientManager(
+                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
-  public PermissionManager(Manager configManager, AuthorInfo authorInfo) {
+  public PermissionManager(ConfigManager configManager, AuthorInfo authorInfo) {
     this.configManager = configManager;
     this.authorInfo = authorInfo;
   }
@@ -44,7 +66,18 @@ public class PermissionManager {
    * @return TSStatus
    */
   public TSStatus operatePermission(AuthorReq authorReq) {
-    return getConsensusManager().write(authorReq).getStatus();
+    TSStatus tsStatus;
+    // If the permissions change, clear the cache content affected by the operation
+    if (authorReq.getAuthorType() == ConfigRequestType.CreateUser
+        || authorReq.getAuthorType() == ConfigRequestType.CreateRole) {
+      tsStatus = getConsensusManager().write(authorReq).getStatus();
+    } else {
+      tsStatus = invalidateCache(authorReq.getUserName(), authorReq.getRoleName());
+      if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        tsStatus = getConsensusManager().write(authorReq).getStatus();
+      }
+    }
+    return tsStatus;
   }
 
   /**
@@ -61,11 +94,47 @@ public class PermissionManager {
     return configManager.getConsensusManager();
   }
 
-  public TSStatus login(String username, String password) {
+  public TPermissionInfoResp login(String username, String password) {
     return authorInfo.login(username, password);
   }
 
-  public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
+  public TPermissionInfoResp checkUserPrivileges(
+      String username, List<String> paths, int permission) {
     return authorInfo.checkUserPrivileges(username, paths, permission);
   }
+
+  /**
+   * When the permission information of a user or role is changed will clear all datanode
+   * permissions related to the user or role
+   */
+  public TSStatus invalidateCache(String username, String roleName) {
+    List<TDataNodeInfo> allDataNodes = configManager.getNodeManager().getOnlineDataNodes(-1);
+    TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
+    TSStatus status;
+    req.setUsername(username);
+    req.setRoleName(roleName);
+    for (TDataNodeInfo dataNodeInfo : allDataNodes) {
+      TEndPoint internalEndPoint = dataNodeInfo.getLocation().getInternalEndPoint();
+      try {
+        status =
+            INTERNAL_SERVICE_CLIENT_MANAGER
+                .borrowClient(internalEndPoint)
+                .invalidatePermissionCache(req);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          status.setMessage(
+              "datanode cache initialization failed, ip: "
+                  + internalEndPoint.getIp()
+                  + ", port: "
+                  + internalEndPoint.getPort());
+          return status;
+        }
+      } catch (IOException | TException e) {
+        logger.error("Failed to initialize cache, the error is {}", e);
+        return RpcUtils.getStatus(
+            TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR,
+            "Failed to initialize cache, the error is " + e.getMessage());
+      }
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index 3c8563b10f..40df733487 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -36,6 +36,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
 import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRoleResp;
+import org.apache.iotdb.confignode.rpc.thrift.TUserResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -58,7 +61,7 @@ public class AuthorInfo implements SnapshotProcessor {
 
   private IAuthorizer authorizer;
 
-  {
+  public AuthorInfo() {
     try {
       authorizer = BasicAuthorizer.getInstance();
     } catch (AuthException e) {
@@ -66,29 +69,40 @@ public class AuthorInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus login(String username, String password) {
+  public TPermissionInfoResp login(String username, String password) {
     boolean status;
     String loginMessage = null;
     TSStatus tsStatus = new TSStatus();
+    TPermissionInfoResp result = new TPermissionInfoResp();
     try {
       status = authorizer.login(username, password);
+      if (status) {
+        // Bring this user's permission information back to the datanode for caching
+        result = getUserPermissionInfo(username);
+        result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully"));
+      } else {
+        result.setUserInfo(new TUserResp("", "", new ArrayList<>(), new ArrayList<>()));
+        Map<String, TRoleResp> roleInfo = new HashMap<>();
+        roleInfo.put("", new TRoleResp("", new ArrayList<>()));
+        result.setRoleInfo(roleInfo);
+      }
     } catch (AuthException e) {
-      logger.info("meet error while logging in.", e);
+      logger.error("meet error while logging in.", e);
       status = false;
       loginMessage = e.getMessage();
     }
-    if (status) {
-      tsStatus.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      tsStatus.setMessage("Login successfully");
-    } else {
+    if (!status) {
       tsStatus.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
       tsStatus.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
+      result.setStatus(tsStatus);
     }
-    return tsStatus;
+    return result;
   }
 
-  public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
+  public TPermissionInfoResp checkUserPrivileges(
+      String username, List<String> paths, int permission) {
     boolean status = true;
+    TPermissionInfoResp result = new TPermissionInfoResp();
     try {
       for (String path : paths) {
         if (!checkOnePath(username, path, permission)) {
@@ -99,9 +113,23 @@ public class AuthorInfo implements SnapshotProcessor {
       status = false;
     }
     if (status) {
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      try {
+        // Bring this user's permission information back to the datanode for caching
+        result = getUserPermissionInfo(username);
+        result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      } catch (AuthException e) {
+        result.setStatus(
+            RpcUtils.getStatus(TSStatusCode.EXECUTE_PERMISSION_EXCEPTION_ERROR, e.getMessage()));
+        return result;
+      }
+      return result;
     } else {
-      return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+      result.setUserInfo(new TUserResp("", "", new ArrayList<>(), new ArrayList<>()));
+      Map<String, TRoleResp> roleInfo = new HashMap<>();
+      roleInfo.put("", new TRoleResp("", new ArrayList<>()));
+      result.setRoleInfo(roleInfo);
+      result.setStatus(RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR));
+      return result;
     }
   }
 
@@ -362,4 +390,50 @@ public class AuthorInfo implements SnapshotProcessor {
     }
     authorizer.reset();
   }
+
+  /**
+   * Save the user's permission information,Bring back the DataNode for caching
+   *
+   * @param username The username of the user that needs to be cached
+   */
+  public TPermissionInfoResp getUserPermissionInfo(String username) throws AuthException {
+    TPermissionInfoResp result = new TPermissionInfoResp();
+    TUserResp tUserResp = new TUserResp();
+    TRoleResp tRoleResp = new TRoleResp();
+    Map<String, TRoleResp> tRoleRespMap = new HashMap();
+    List<String> userPrivilegeList = new ArrayList<>();
+    List<String> rolePrivilegeList = new ArrayList<>();
+
+    // User permission information
+    User user = authorizer.getUser(username);
+    if (user.getPrivilegeList() != null) {
+      for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+        userPrivilegeList.add(pathPrivilege.getPath());
+        String privilegeIdList = pathPrivilege.getPrivileges().toString();
+        userPrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+      }
+      tUserResp.setUsername(user.getName());
+      tUserResp.setPassword(user.getPassword());
+      tUserResp.setPrivilegeList(userPrivilegeList);
+      tUserResp.setRoleList(user.getRoleList());
+    }
+
+    // Permission information for roles owned by users
+    if (user.getRoleList() != null) {
+      for (String roleName : user.getRoleList()) {
+        Role role = authorizer.getRole(roleName);
+        tRoleResp.setRoleName(roleName);
+        for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+          rolePrivilegeList.add(pathPrivilege.getPath());
+          String privilegeIdList = pathPrivilege.getPrivileges().toString();
+          rolePrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+        }
+        tRoleResp.setPrivilegeList(rolePrivilegeList);
+        tRoleRespMap.put(roleName, tRoleResp);
+      }
+    }
+    result.setUserInfo(tUserResp);
+    result.setRoleInfo(tRoleRespMap);
+    return result;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 3b20df5131..6cc00fb56f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -356,12 +357,12 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
   }
 
   @Override
-  public TSStatus login(TLoginReq req) throws TException {
+  public TPermissionInfoResp login(TLoginReq req) throws TException {
     return configManager.login(req.getUserrname(), req.getPassword());
   }
 
   @Override
-  public TSStatus checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
+  public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
     return configManager.checkUserPrivileges(
         req.getUsername(), req.getPaths(), req.getPermission());
   }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
index 1bf52a65b3..2e2d96ef3f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
@@ -112,7 +112,10 @@ public class AuthorInfoTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // check user privileges
-    status = authorInfo.checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal());
+    status =
+        authorInfo
+            .checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal())
+            .getStatus();
     Assert.assertEquals(TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(), status.getCode());
 
     // drop user
@@ -163,7 +166,10 @@ public class AuthorInfoTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // check user privileges
-    status = authorInfo.checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal());
+    status =
+        authorInfo
+            .checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal())
+            .getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // grant role
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 371993af04..929145250b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -611,7 +611,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     // check user privileges
     checkUserPrivilegesReq =
         new TCheckUserPrivilegesReq("tempuser0", paths, PrivilegeType.DELETE_USER.ordinal());
-    status = processor.checkUserPrivileges(checkUserPrivilegesReq);
+    status = processor.checkUserPrivileges(checkUserPrivilegesReq).getStatus();
     Assert.assertEquals(TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(), status.getCode());
 
     // drop user
@@ -707,7 +707,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     // check user privileges
     checkUserPrivilegesReq =
         new TCheckUserPrivilegesReq("tempuser0", paths, PrivilegeType.DELETE_USER.ordinal());
-    status = processor.checkUserPrivileges(checkUserPrivilegesReq);
+    status = processor.checkUserPrivileges(checkUserPrivilegesReq).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // grant role
diff --git a/docs/UserGuide/Administration-Management/Administration.md b/docs/UserGuide/Administration-Management/Administration.md
index 6411080736..ae91051e00 100644
--- a/docs/UserGuide/Administration-Management/Administration.md
+++ b/docs/UserGuide/Administration-Management/Administration.md
@@ -343,4 +343,8 @@ IoTDB specifies that the character length of a role name should have no less tha
 A path pattern's result set contains all the elements of its sub pattern's
 result set. For example, `root.sg.d.*` is a sub pattern of
 `root.sg.*.*`, while `root.sg.**` is not a sub pattern of
-`root.sg.*.*`. When a user is granted privilege on a pattern, the pattern used in his DDL or DML must be a sub pattern of the privilege pattern, which guarantees that the user won't access the timeseries exceed his privilege scope.
\ No newline at end of file
+`root.sg.*.*`. When a user is granted privilege on a pattern, the pattern used in his DDL or DML must be a sub pattern of the privilege pattern, which guarantees that the user won't access the timeseries exceed his privilege scope.
+
+### Permission cache
+
+In distributed related permission operations, when changing permissions other than creating users and roles, all the cache information of `dataNode` related to the user (role) will be cleared first. If any `dataNode` cache information is clear and fails, the permission change task will fail.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Administration-Management/Administration.md b/docs/zh/UserGuide/Administration-Management/Administration.md
index 74a46ea08a..2b1f3a0374 100644
--- a/docs/zh/UserGuide/Administration-Management/Administration.md
+++ b/docs/zh/UserGuide/Administration-Management/Administration.md
@@ -339,4 +339,8 @@ IoTDB 规定角色名的字符长度不小于 4,其中角色名不能包含空
 
 ### 权限管理中的路径模式
 
-一个路径模式的结果集包含了它的子模式的结果集的所有元素。例如,`root.sg.d.*`是`root.sg.*.*`的子模式,而`root.sg.**`不是`root.sg.*.*`的子模式。当用户被授予对某个路径模式的权限时,在他的DDL或DML中使用的模式必须是该路径模式的子模式,这保证了用户访问时间序列时不会超出他的权限范围。
\ No newline at end of file
+一个路径模式的结果集包含了它的子模式的结果集的所有元素。例如,`root.sg.d.*`是`root.sg.*.*`的子模式,而`root.sg.**`不是`root.sg.*.*`的子模式。当用户被授予对某个路径模式的权限时,在他的DDL或DML中使用的模式必须是该路径模式的子模式,这保证了用户访问时间序列时不会超出他的权限范围。
+
+### 权限缓存
+
+在分布式相关的权限操作中,在进行除了创建用户和角色之外的其他权限更改操作时,都会先清除与该用户(角色)相关的所有的`dataNode`的缓存信息,如果任何一台`dataNode`缓存信息清楚失败,这个权限更改的任务就会失败。
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 97f96e0f1c..85fa4750a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,6 +213,8 @@
         <awaitility.version>4.0.2</awaitility.version>
         <!-- JDK1.8 only support google java format 1.7-->
         <google.java.format.version>1.7</google.java.format.version>
+        <!-- caffeine cache -->
+        <caffeine>2.9.1</caffeine>
     </properties>
     <!--
         if we claim dependencies in dependencyManagement, then we do not claim
@@ -602,6 +604,11 @@
                 <artifactId>jetty-server</artifactId>
                 <version>${jetty-server.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.ben-manes.caffeine</groupId>
+                <artifactId>caffeine</artifactId>
+                <version>${caffeine}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 0bd7c6a889..66c86c5ee3 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -832,6 +832,14 @@ timestamp_precision=ms
 
 #openID_url=
 
+# Cache size of user and role
+# Datatype: int
+# author_cache_size=1000
+
+# Cache expire time of user and role
+# Datatype: int
+# author_cache_expire_time=30
+
 ####################
 ### UDF Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index b2e58c3610..94c6a10001 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -21,16 +21,9 @@ package org.apache.iotdb.db.auth;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.AuthUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.db.client.ConfigNodeClient;
-import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.OperationType;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -41,11 +34,9 @@ import org.apache.iotdb.rpc.ConfigNodeConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -60,10 +51,6 @@ public class AuthorityChecker {
   private static AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
   private static SessionManager sessionManager = SessionManager.getInstance();
 
-  private static final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
-      new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
-          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
-
   private AuthorityChecker() {}
 
   /**
@@ -118,7 +105,7 @@ public class AuthorityChecker {
    */
   public static boolean checkPermission(
       String username, List<? extends PartialPath> paths, StatementType type, String targetUser)
-      throws ConfigNodeConnectionException {
+      throws AuthException, ConfigNodeConnectionException {
     if (SUPER_USER.equals(username)) {
       return true;
     }
@@ -141,7 +128,7 @@ public class AuthorityChecker {
       allPath.add(AuthUtils.ROOT_PATH_PRIVILEGE);
     }
 
-    TSStatus status = checkPath(username, allPath, permission);
+    TSStatus status = authorizerManager.checkPermissionCache(username, allPath, permission);
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return true;
     } else {
@@ -163,25 +150,6 @@ public class AuthorityChecker {
     return false;
   }
 
-  /** Check the user */
-  public static TSStatus checkUser(String username, String password)
-      throws ConfigNodeConnectionException {
-    TLoginReq req = new TLoginReq(username, password);
-    TSStatus status = null;
-    try (ConfigNodeClient configNodeClient =
-        configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Send request to some API server
-      status = configNodeClient.login(req);
-    } catch (TException | IOException e) {
-      throw new ConfigNodeConnectionException("Couldn't connect config node");
-    } finally {
-      if (status == null) {
-        status = new TSStatus();
-      }
-    }
-    return status;
-  }
-
   /** Check whether specific Session has the authorization to given plan. */
   public static TSStatus checkAuthority(Statement statement, long sessionId) {
     try {
@@ -214,24 +182,6 @@ public class AuthorityChecker {
         username, statement.getPaths(), statement.getType(), targetUser);
   }
 
-  public static TSStatus checkPath(String username, List<String> allPath, int permission)
-      throws ConfigNodeConnectionException {
-    TCheckUserPrivilegesReq req = new TCheckUserPrivilegesReq(username, allPath, permission);
-    TSStatus status = null;
-    try (ConfigNodeClient configNodeClient =
-        configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Send request to some API server
-      status = configNodeClient.checkUserPrivileges(req);
-    } catch (TException | IOException e) {
-      throw new ConfigNodeConnectionException("Couldn't connect config node");
-    } finally {
-      if (status == null) {
-        status = new TSStatus();
-      }
-    }
-    return status;
-  }
-
   private static int translateToPermissionId(Operator.OperatorType type) {
     switch (type) {
       case GRANT_ROLE_PRIVILEGE:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
similarity index 67%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
copy to server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
index 31be6557a2..78d92bb33e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
@@ -17,29 +17,31 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.execution.config;
+package org.apache.iotdb.db.auth;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
+import org.apache.iotdb.db.mpp.plan.execution.config.AuthorizerConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.rpc.ConfigNodeConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -51,54 +53,52 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
-public class AuthorizerConfigTask implements IConfigTask {
+public class AuthorityFetcher {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizerConfigTask.class);
+  private static final Logger logger = LoggerFactory.getLogger(AuthorizerConfigTask.class);
 
-  private AuthorStatement authorStatement;
+  private static final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
+      new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
 
-  public AuthorizerConfigTask(AuthorStatement authorStatement) {
-    this.authorStatement = authorStatement;
+  public static TPermissionInfoResp checkPath(String username, List<String> allPath, int permission)
+      throws ConfigNodeConnectionException {
+    TCheckUserPrivilegesReq req = new TCheckUserPrivilegesReq(username, allPath, permission);
+    TPermissionInfoResp status = null;
+    try (ConfigNodeClient configNodeClient =
+        configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      // Send request to some API server
+      status = configNodeClient.checkUserPrivileges(req);
+    } catch (TException | IOException e) {
+      throw new ConfigNodeConnectionException("Couldn't connect config node");
+    } finally {
+      if (status == null) {
+        status = new TPermissionInfoResp();
+      }
+    }
+    return status;
   }
 
-  @Override
-  public ListenableFuture<ConfigTaskResult> execute(
-      IClientManager<PartitionRegionId, ConfigNodeClient> clientManager) {
-    SettableFuture<ConfigTaskResult> future = null;
+  /** Check the user */
+  public static TPermissionInfoResp checkUser(String username, String password)
+      throws ConfigNodeConnectionException {
+    TLoginReq req = new TLoginReq(username, password);
+    TPermissionInfoResp status = null;
     try (ConfigNodeClient configNodeClient =
-        clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Construct request using statement
-      TAuthorizerReq req =
-          new TAuthorizerReq(
-              authorStatement.getAuthorType().ordinal(),
-              authorStatement.getUserName() == null ? "" : authorStatement.getUserName(),
-              authorStatement.getRoleName() == null ? "" : authorStatement.getRoleName(),
-              authorStatement.getPassWord() == null ? "" : authorStatement.getPassWord(),
-              authorStatement.getNewPassword() == null ? "" : authorStatement.getNewPassword(),
-              AuthorPlan.strToPermissions(authorStatement.getPrivilegeList()),
-              authorStatement.getNodeName() == null
-                  ? ""
-                  : authorStatement.getNodeName().getFullPath());
-
+        configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
       // Send request to some API server
-      if (authorStatement.getQueryType() == QueryType.WRITE) {
-        future = operatePermission(req, configNodeClient);
-      } else {
-        future = queryPermission(req, configNodeClient);
+      status = configNodeClient.login(req);
+    } catch (TException | IOException e) {
+      throw new ConfigNodeConnectionException("Couldn't connect config node");
+    } finally {
+      if (status == null) {
+        status = new TPermissionInfoResp();
       }
-    } catch (AuthException e) {
-      LOGGER.error("No such privilege {}.", authorStatement.getAuthorType());
-      future.setException(e);
-    } catch (IOException e) {
-      LOGGER.error("can't connect to all config nodes", e);
-      future.setException(e);
     }
-    // If the action is executed successfully, return the Future.
-    // If your operation is async, you can return the corresponding future directly.
-    return future;
+    return status;
   }
 
-  private SettableFuture<ConfigTaskResult> operatePermission(
+  public static SettableFuture<ConfigTaskResult> operatePermission(
       TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try {
@@ -106,7 +106,7 @@ public class AuthorizerConfigTask implements IConfigTask {
       TSStatus tsStatus = configNodeClient.operatePermission(authorizerReq);
       // Get response or throw exception
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        LOGGER.error(
+        logger.error(
             "Failed to execute {} in config node, status is {}.",
             AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
                 .toString()
@@ -117,7 +117,7 @@ public class AuthorizerConfigTask implements IConfigTask {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
     } catch (TException e) {
-      LOGGER.error("Failed to connect to config node.");
+      logger.error("Failed to connect to config node.");
       future.setException(e);
     }
     // If the action is executed successfully, return the Future.
@@ -125,7 +125,7 @@ public class AuthorizerConfigTask implements IConfigTask {
     return future;
   }
 
-  private SettableFuture<ConfigTaskResult> queryPermission(
+  public static SettableFuture<ConfigTaskResult> queryPermission(
       TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     TAuthorizerResp authorizerResp;
@@ -134,7 +134,7 @@ public class AuthorizerConfigTask implements IConfigTask {
       authorizerResp = configNodeClient.queryPermission(authorizerReq);
       // Get response or throw exception
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != authorizerResp.getStatus().getCode()) {
-        LOGGER.error(
+        logger.error(
             "Failed to execute {} in config node, status is {}.",
             AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
                 .toString()
@@ -172,12 +172,8 @@ public class AuthorizerConfigTask implements IConfigTask {
             new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
       }
     } catch (TException e) {
-      LOGGER.error("Failed to connect to config node.");
+      logger.error("Failed to connect to config node.");
       future.setException(e);
-    } finally {
-      if (configNodeClient != null) {
-        configNodeClient.close();
-      }
     }
     // If the action is executed successfully, return the Future.
     // If your operation is async, you can return the corresponding future directly.
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
index 7ea4e3f225..057f9f9d57 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
@@ -19,21 +19,38 @@
 
 package org.apache.iotdb.db.auth;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.auth.authorizer.BasicAuthorizer;
 import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.commons.auth.entity.PathPrivilege;
 import org.apache.iotdb.commons.auth.entity.Role;
 import org.apache.iotdb.commons.auth.entity.User;
-
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.rpc.ConfigNodeConnectionException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class AuthorizerManager implements IAuthorizer {
@@ -41,12 +58,25 @@ public class AuthorizerManager implements IAuthorizer {
   private static final Logger logger = LoggerFactory.getLogger(AuthorizerManager.class);
 
   private IAuthorizer iAuthorizer;
-  private ReentrantReadWriteLock snapshotLock;
+  private ReentrantReadWriteLock authReadWriteLock;
+  private IoTDBDescriptor conf = IoTDBDescriptor.getInstance();
+
+  private Cache<String, User> userCache =
+      Caffeine.newBuilder()
+          .maximumSize(conf.getConfig().getAuthorCacheSize())
+          .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(), TimeUnit.MINUTES)
+          .build();
+
+  private Cache<String, Role> roleCache =
+      Caffeine.newBuilder()
+          .maximumSize(conf.getConfig().getAuthorCacheSize())
+          .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(), TimeUnit.MINUTES)
+          .build();
 
   public AuthorizerManager() {
     try {
       iAuthorizer = BasicAuthorizer.getInstance();
-      snapshotLock = new ReentrantReadWriteLock();
+      authReadWriteLock = new ReentrantReadWriteLock();
     } catch (AuthException e) {
       logger.error(e.getMessage());
     }
@@ -65,146 +95,146 @@ public class AuthorizerManager implements IAuthorizer {
 
   @Override
   public boolean login(String username, String password) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.login(username, password);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public void createUser(String username, String password) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.createUser(username, password);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void deleteUser(String username) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.deleteUser(username);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void grantPrivilegeToUser(String username, String path, int privilegeId)
       throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.grantPrivilegeToUser(username, path, privilegeId);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void revokePrivilegeFromUser(String username, String path, int privilegeId)
       throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.revokePrivilegeFromUser(username, path, privilegeId);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void createRole(String roleName) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.createRole(roleName);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void deleteRole(String roleName) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.deleteRole(roleName);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void grantPrivilegeToRole(String roleName, String path, int privilegeId)
       throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.grantPrivilegeToRole(roleName, path, privilegeId);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void revokePrivilegeFromRole(String roleName, String path, int privilegeId)
       throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.revokePrivilegeFromRole(roleName, path, privilegeId);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void grantRoleToUser(String roleName, String username) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.grantRoleToUser(roleName, username);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void revokeRoleFromUser(String roleName, String username) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.revokeRoleFromUser(roleName, username);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public Set<Integer> getPrivileges(String username, String path) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.getPrivileges(username, path);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public void updateUserPassword(String username, String newPassword) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.updateUserPassword(username, newPassword);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public boolean checkUserPrivileges(String username, String path, int privilegeId)
       throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.checkUserPrivileges(username, path, privilegeId);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
@@ -215,131 +245,317 @@ public class AuthorizerManager implements IAuthorizer {
 
   @Override
   public List<String> listAllUsers() {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.listAllUsers();
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public List<String> listAllRoles() {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.listAllRoles();
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public Role getRole(String roleName) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.getRole(roleName);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public User getUser(String username) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.getUser(username);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public boolean isUserUseWaterMark(String userName) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.isUserUseWaterMark(userName);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public void setUserUseWaterMark(String userName, boolean useWaterMark) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       iAuthorizer.setUserUseWaterMark(userName, useWaterMark);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public Map<String, Boolean> getAllUserWaterMarkStatus() {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.getAllUserWaterMarkStatus();
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public Map<String, User> getAllUsers() {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.getAllUsers();
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public Map<String, Role> getAllRoles() {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       return iAuthorizer.getAllRoles();
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public void replaceAllUsers(Map<String, User> users) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       iAuthorizer.replaceAllUsers(users);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public void replaceAllRoles(Map<String, Role> roles) throws AuthException {
-    snapshotLock.readLock().lock();
+    authReadWriteLock.readLock().lock();
     try {
       iAuthorizer.replaceAllRoles(roles);
     } finally {
-      snapshotLock.readLock().unlock();
+      authReadWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
-    snapshotLock.writeLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       return iAuthorizer.processTakeSnapshot(snapshotDir);
     } finally {
-      snapshotLock.writeLock().unlock();
+      authReadWriteLock.writeLock().unlock();
     }
   }
 
   @Override
   public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
-    snapshotLock.writeLock().lock();
+    authReadWriteLock.writeLock().lock();
     try {
       iAuthorizer.processLoadSnapshot(snapshotDir);
     } finally {
-      snapshotLock.writeLock().unlock();
+      authReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  public TSStatus checkPermissionCache(String username, List<String> allPath, int permission)
+      throws AuthException, ConfigNodeConnectionException {
+    authReadWriteLock.readLock().lock();
+    try {
+      User user = userCache.getIfPresent(username);
+      if (user != null) {
+        for (String path : allPath) {
+          if (!user.checkPrivilege(path, permission)) {
+            if (user.getRoleList().isEmpty()) {
+              return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+            }
+            boolean status = false;
+            for (String roleName : user.getRoleList()) {
+              Role role = roleCache.getIfPresent(roleName);
+              // It is detected that the role of the user does not exist in the cache, indicating
+              // that the permission information of the role has changed.
+              // The user cache needs to be initialized
+              if (role == null) {
+                invalidateCache(username, "");
+                return checkPath(username, allPath, permission);
+              }
+              status = role.checkPrivilege(path, permission);
+              if (status) {
+                break;
+              }
+            }
+            if (!status) {
+              return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+            }
+          }
+        }
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      } else {
+        return checkPath(username, allPath, permission);
+      }
+    } finally {
+      authReadWriteLock.readLock().unlock();
+    }
+  }
+
+  public TSStatus checkPath(String username, List<String> allPath, int permission)
+      throws ConfigNodeConnectionException {
+    TPermissionInfoResp tPermissionInfoResp =
+        AuthorityFetcher.checkPath(username, allPath, permission);
+    if (tPermissionInfoResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      userCache.put(username, cacheUser(tPermissionInfoResp));
+      return tPermissionInfoResp.getStatus();
+    } else {
+      return tPermissionInfoResp.getStatus();
     }
   }
+
+  /** Check the user */
+  public TSStatus checkUser(String username, String password) throws ConfigNodeConnectionException {
+    authReadWriteLock.readLock().lock();
+    try {
+      User user = userCache.getIfPresent(username);
+      if (user != null) {
+        if (password != null && AuthUtils.validatePassword(password, user.getPassword())) {
+          return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+        } else {
+          return RpcUtils.getStatus(
+              TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, "Authentication failed.");
+        }
+      } else {
+        TPermissionInfoResp tPermissionInfoResp = AuthorityFetcher.checkUser(username, password);
+        if (tPermissionInfoResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          userCache.put(username, cacheUser(tPermissionInfoResp));
+          return tPermissionInfoResp.getStatus();
+        } else {
+          return tPermissionInfoResp.getStatus();
+        }
+      }
+    } finally {
+      authReadWriteLock.readLock().unlock();
+    }
+  }
+
+  public SettableFuture<ConfigTaskResult> queryPermission(
+      TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+    authReadWriteLock.readLock().lock();
+    try {
+      return AuthorityFetcher.queryPermission(authorizerReq, configNodeClient);
+    } finally {
+      authReadWriteLock.readLock().unlock();
+    }
+  }
+
+  public SettableFuture<ConfigTaskResult> operatePermission(
+      TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+    authReadWriteLock.writeLock().lock();
+    try {
+      return AuthorityFetcher.operatePermission(authorizerReq, configNodeClient);
+    } finally {
+      authReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  /** cache user */
+  public User cacheUser(TPermissionInfoResp tPermissionInfoResp) {
+    User user = new User();
+    List<String> privilegeList = tPermissionInfoResp.getUserInfo().getPrivilegeList();
+    List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
+    user.setName(tPermissionInfoResp.getUserInfo().getUsername());
+    user.setPassword(tPermissionInfoResp.getUserInfo().getPassword());
+    for (int i = 0; i < privilegeList.size(); i++) {
+      String path = privilegeList.get(i);
+      String privilege = privilegeList.get(++i);
+      pathPrivilegeList.add(toPathPrivilege(path, privilege));
+    }
+    user.setPrivilegeList(pathPrivilegeList);
+    user.setRoleList(tPermissionInfoResp.getUserInfo().getRoleList());
+    for (String roleName : tPermissionInfoResp.getRoleInfo().keySet()) {
+      roleCache.put(roleName, cacheRole(roleName, tPermissionInfoResp));
+    }
+    return user;
+  }
+
+  /** cache role */
+  public Role cacheRole(String roleName, TPermissionInfoResp tPermissionInfoResp) {
+    Role role = new Role();
+    List<String> privilegeList = tPermissionInfoResp.getRoleInfo().get(roleName).getPrivilegeList();
+    List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
+    role.setName(tPermissionInfoResp.getRoleInfo().get(roleName).getRoleName());
+    for (int i = 0; i < privilegeList.size(); i++) {
+      String path = privilegeList.get(i);
+      String privilege = privilegeList.get(++i);
+      pathPrivilegeList.add(toPathPrivilege(path, privilege));
+    }
+    role.setPrivilegeList(pathPrivilegeList);
+    return role;
+  }
+
+  /**
+   * Initialize user and role cache information.
+   *
+   * <p>If the permission information of the role changes, only the role cache information is
+   * cleared. During permission checking, if the role belongs to a user, the user will be
+   * initialized.
+   */
+  public boolean invalidateCache(String username, String roleName) {
+    if (userCache.getIfPresent(username) != null) {
+      List<String> roleList = userCache.getIfPresent(username).getRoleList();
+      if (!roleList.isEmpty()) {
+        roleCache.invalidateAll(roleList);
+      }
+      userCache.invalidate(username);
+    }
+    if (roleCache.getIfPresent(roleName) != null) {
+      roleCache.invalidate(roleName);
+    }
+    if (userCache.getIfPresent(username) != null && roleCache.getIfPresent(roleName) != null) {
+      logger.error("datanode cache initialization failed");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Convert user privilege information obtained from confignode to PathPrivilege
+   *
+   * @param path permission path
+   * @param privilege privilegeIds
+   * @return
+   */
+  private PathPrivilege toPathPrivilege(String path, String privilege) {
+    PathPrivilege pathPrivilege = new PathPrivilege();
+    String[] privileges = privilege.replace(" ", "").split(",");
+    Set<Integer> privilegeIds = new HashSet<>();
+    for (String p : privileges) {
+      privilegeIds.add(Integer.parseInt(p));
+    }
+    pathPrivilege.setPrivileges(privilegeIds);
+    pathPrivilege.setPath(path);
+    return pathPrivilege;
+  }
+
+  public Cache<String, User> getUserCache() {
+    return userCache;
+  }
+
+  public Cache<String, Role> getRoleCache() {
+    return roleCache;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f1a9f40bda..ebd6034542 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -524,11 +525,11 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
   }
 
   @Override
-  public TSStatus login(TLoginReq req) throws TException {
+  public TPermissionInfoResp login(TLoginReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
-        TSStatus status = client.login(req);
-        if (!updateConfigNodeLeader(status)) {
+        TPermissionInfoResp status = client.login(req);
+        if (!updateConfigNodeLeader(status.getStatus())) {
           return status;
         }
       } catch (TException e) {
@@ -540,11 +541,11 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
   }
 
   @Override
-  public TSStatus checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
+  public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
-        TSStatus status = client.checkUserPrivileges(req);
-        if (!updateConfigNodeLeader(status)) {
+        TPermissionInfoResp status = client.checkUserPrivileges(req);
+        if (!updateConfigNodeLeader(status.getStatus())) {
           return status;
         }
       } catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cf3e35f044..14f427eb7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -881,6 +881,12 @@ public class IoTDBConfig {
    */
   private int partitionCacheSize = 10000;
 
+  /** Cache size of user and role */
+  private int authorCacheSize = 100;
+
+  /** Cache expire time of user and role */
+  private int authorCacheExpireTime = 30;
+
   IoTDBConfig() {}
 
   public float getUdfMemoryBudgetInMB() {
@@ -2760,4 +2766,20 @@ public class IoTDBConfig {
   public void setPartitionCacheSize(int partitionCacheSize) {
     this.partitionCacheSize = partitionCacheSize;
   }
+
+  public int getAuthorCacheSize() {
+    return authorCacheSize;
+  }
+
+  public void setAuthorCacheSize(int authorCacheSize) {
+    this.authorCacheSize = authorCacheSize;
+  }
+
+  public int getAuthorCacheExpireTime() {
+    return authorCacheExpireTime;
+  }
+
+  public void setAuthorCacheExpireTime(int authorCacheExpireTime) {
+    this.authorCacheExpireTime = authorCacheExpireTime;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f40850fef2..d5d9fc6c78 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -887,6 +887,9 @@ public class IoTDBDescriptor {
 
       // shuffle
       loadShuffleProps(properties);
+
+      // author cache
+      loadAuthorCache(properties);
     } catch (FileNotFoundException e) {
       logger.warn("Fail to find config file {}", url, e);
     } catch (IOException e) {
@@ -900,6 +903,17 @@ public class IoTDBDescriptor {
     }
   }
 
+  private void loadAuthorCache(Properties properties) {
+    conf.setAuthorCacheSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "author_cache_size", String.valueOf(conf.getAuthorCacheSize()))));
+    conf.setAuthorCacheExpireTime(
+        Integer.parseInt(
+            properties.getProperty(
+                "author_cache_expire_time", String.valueOf(conf.getAuthorCacheExpireTime()))));
+  }
+
   // to keep consistent with the cluster module.
   private void replaceHostnameWithIP() throws UnknownHostException, BadNodeUrlFormatException {
     boolean isInvalidRpcIp = InetAddresses.isInetAddress(conf.getRpcAddress());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
index 31be6557a2..3193464919 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
@@ -19,43 +19,30 @@
 
 package org.apache.iotdb.db.mpp.plan.execution.config;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
+import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
-import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
 
 public class AuthorizerConfigTask implements IConfigTask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizerConfigTask.class);
 
   private AuthorStatement authorStatement;
+  private AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
 
   public AuthorizerConfigTask(AuthorStatement authorStatement) {
     this.authorStatement = authorStatement;
@@ -82,9 +69,9 @@ public class AuthorizerConfigTask implements IConfigTask {
 
       // Send request to some API server
       if (authorStatement.getQueryType() == QueryType.WRITE) {
-        future = operatePermission(req, configNodeClient);
+        future = authorizerManager.operatePermission(req, configNodeClient);
       } else {
-        future = queryPermission(req, configNodeClient);
+        future = authorizerManager.queryPermission(req, configNodeClient);
       }
     } catch (AuthException e) {
       LOGGER.error("No such privilege {}.", authorStatement.getAuthorType());
@@ -97,90 +84,4 @@ public class AuthorizerConfigTask implements IConfigTask {
     // If your operation is async, you can return the corresponding future directly.
     return future;
   }
-
-  private SettableFuture<ConfigTaskResult> operatePermission(
-      TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
-    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    try {
-      // Send request to some API server
-      TSStatus tsStatus = configNodeClient.operatePermission(authorizerReq);
-      // Get response or throw exception
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        LOGGER.error(
-            "Failed to execute {} in config node, status is {}.",
-            AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
-                .toString()
-                .toLowerCase(Locale.ROOT),
-            tsStatus);
-        future.setException(new StatementExecutionException(tsStatus));
-      } else {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      }
-    } catch (TException e) {
-      LOGGER.error("Failed to connect to config node.");
-      future.setException(e);
-    }
-    // If the action is executed successfully, return the Future.
-    // If your operation is async, you can return the corresponding future directly.
-    return future;
-  }
-
-  private SettableFuture<ConfigTaskResult> queryPermission(
-      TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
-    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    TAuthorizerResp authorizerResp;
-    try {
-      // Send request to some API server
-      authorizerResp = configNodeClient.queryPermission(authorizerReq);
-      // Get response or throw exception
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != authorizerResp.getStatus().getCode()) {
-        LOGGER.error(
-            "Failed to execute {} in config node, status is {}.",
-            AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
-                .toString()
-                .toLowerCase(Locale.ROOT),
-            authorizerResp.getStatus());
-        future.setException(new StatementExecutionException(authorizerResp.getStatus()));
-      } else {
-        // build TSBlock
-        List<TSDataType> types = new ArrayList<>();
-        Map<String, List<String>> authorizerInfo = authorizerResp.getAuthorizerInfo();
-        for (int i = 0; i < authorizerInfo.size(); i++) {
-          types.add(TSDataType.TEXT);
-        }
-        TsBlockBuilder builder = new TsBlockBuilder(types);
-        List<ColumnHeader> headerList = new ArrayList<>();
-
-        for (String header : authorizerInfo.keySet()) {
-          headerList.add(new ColumnHeader(header, TSDataType.TEXT));
-        }
-        // The Time column will be ignored by the setting of ColumnHeader.
-        // So we can put a meaningless value here
-        for (String value : authorizerInfo.get(headerList.get(0).getColumnName())) {
-          builder.getTimeColumnBuilder().writeLong(0L);
-          builder.getColumnBuilder(0).writeBinary(new Binary(value));
-          builder.declarePosition();
-        }
-        for (int i = 1; i < headerList.size(); i++) {
-          for (String value : authorizerInfo.get(headerList.get(i).getColumnName())) {
-            builder.getColumnBuilder(i).writeBinary(new Binary(value));
-          }
-        }
-
-        DatasetHeader datasetHeader = new DatasetHeader(headerList, true);
-        future.set(
-            new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
-      }
-    } catch (TException e) {
-      LOGGER.error("Failed to connect to config node.");
-      future.setException(e);
-    } finally {
-      if (configNodeClient != null) {
-        configNodeClient.close();
-      }
-    }
-    // If the action is executed successfully, return the Future.
-    // If your operation is async, you can return the corresponding future directly.
-    return future;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 86bc50adef..2823642630 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
@@ -151,7 +152,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
     TSStatus loginStatus;
     try {
-      loginStatus = AuthorityChecker.checkUser(req.username, req.password);
+      loginStatus = AuthorizerManager.getInstance().checkUser(req.username, req.password);
     } catch (ConfigNodeConnectionException e) {
       TSStatus tsStatus = RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR, e.getMessage());
       return new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 93e679610f..4c7f08edf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.DataRegionException;
@@ -75,12 +76,14 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMigrateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMigrateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
@@ -325,6 +328,14 @@ public class InternalServiceImpl implements InternalService.Iface {
     return result;
   }
 
+  @Override
+  public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) throws TException {
+    if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) {
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+    return RpcUtils.getStatus(TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
+  }
+
   @Override
   public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws TException {
     long queryIdRaw = SessionManager.getInstance().requestQueryId(false);
diff --git a/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java b/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
new file mode 100644
index 0000000000..2dfa147efc
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.auth;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.auth.entity.PathPrivilege;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.auth.entity.Role;
+import org.apache.iotdb.commons.auth.entity.User;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRoleResp;
+import org.apache.iotdb.confignode.rpc.thrift.TUserResp;
+import org.apache.iotdb.rpc.ConfigNodeConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AuthorizerManagerTest {
+
+  AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
+
+  @Test
+  public void permissionCacheTest() throws ConfigNodeConnectionException, AuthException {
+    User user = new User();
+    Role role1 = new Role();
+    Role role2 = new Role();
+    List<String> roleList = new ArrayList<>();
+    Set<Integer> privilegesIds = new HashSet<>();
+    PathPrivilege privilege = new PathPrivilege();
+    List<PathPrivilege> privilegeList = new ArrayList<>();
+    privilegesIds.add(PrivilegeType.CREATE_ROLE.ordinal());
+    privilegesIds.add(PrivilegeType.REVOKE_USER_ROLE.ordinal());
+    privilege.setPath("root.ln");
+    privilege.setPrivileges(privilegesIds);
+    privilegeList.add(privilege);
+    role1.setName("role1");
+    role1.setPrivilegeList(privilegeList);
+    role2.setName("role2");
+    role2.setPrivilegeList(new ArrayList<>());
+    roleList.add("role1");
+    roleList.add("role2");
+    user.setName("user");
+    user.setPassword("password");
+    user.setPrivilegeList(privilegeList);
+    user.setRoleList(roleList);
+    TPermissionInfoResp result = new TPermissionInfoResp();
+    TUserResp tUserResp = new TUserResp();
+    Map<String, TRoleResp> tRoleRespMap = new HashMap();
+    List<String> userPrivilegeList = new ArrayList<>();
+    List<String> rolePrivilegeList = new ArrayList<>();
+    List<Role> roleList1 = new ArrayList<>();
+    roleList1.add(role1);
+    roleList1.add(role2);
+
+    // User permission information
+    for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+      userPrivilegeList.add(pathPrivilege.getPath());
+      String privilegeIdList = pathPrivilege.getPrivileges().toString();
+      userPrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+    }
+    tUserResp.setUsername(user.getName());
+    tUserResp.setPassword(user.getPassword());
+    tUserResp.setPrivilegeList(userPrivilegeList);
+    tUserResp.setRoleList(new ArrayList<>());
+    result.setUserInfo(tUserResp);
+    result.setRoleInfo(new HashMap<>());
+
+    // User authentication permission without role
+    authorizerManager.getUserCache().put(user.getName(), authorizerManager.cacheUser(result));
+    User user1 = authorizerManager.getUserCache().getIfPresent(user.getName());
+    assert user1 != null;
+    Assert.assertEquals(user.getName(), user1.getName());
+    Assert.assertEquals(user.getPassword(), user1.getPassword());
+    Assert.assertEquals(user.getPrivilegeList(), user1.getPrivilegeList());
+
+    // User has permission
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+        authorizerManager
+            .checkPermissionCache(
+                "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_ROLE.ordinal())
+            .getCode());
+    // User does not have permission
+    Assert.assertEquals(
+        TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(),
+        authorizerManager
+            .checkPermissionCache(
+                "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_USER.ordinal())
+            .getCode());
+
+    // Authenticate users with roles
+    authorizerManager.invalidateCache(user.getName(), "");
+    tUserResp.setPrivilegeList(new ArrayList<>());
+    tUserResp.setRoleList(user.getRoleList());
+
+    // Permission information for roles owned by users
+    for (Role role : roleList1) {
+      TRoleResp tRoleResp = new TRoleResp();
+      rolePrivilegeList = new ArrayList<>();
+      tRoleResp.setRoleName(role.getName());
+      for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+        rolePrivilegeList.add(pathPrivilege.getPath());
+        String privilegeIdList = pathPrivilege.getPrivileges().toString();
+        rolePrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+      }
+      tRoleResp.setPrivilegeList(rolePrivilegeList);
+      tRoleRespMap.put(role.getName(), tRoleResp);
+    }
+    result.setRoleInfo(tRoleRespMap);
+    authorizerManager.getUserCache().put(user.getName(), authorizerManager.cacheUser(result));
+    Role role3 = authorizerManager.getRoleCache().getIfPresent(role1.getName());
+    Assert.assertEquals(role1.getName(), role3.getName());
+    Assert.assertEquals(role1.getPrivilegeList(), role3.getPrivilegeList());
+
+    // role has permission
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+        authorizerManager
+            .checkPermissionCache(
+                "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_ROLE.ordinal())
+            .getCode());
+    // role does not have permission
+    Assert.assertEquals(
+        TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(),
+        authorizerManager
+            .checkPermissionCache(
+                "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_USER.ordinal())
+            .getCode());
+
+    authorizerManager.invalidateCache(user.getName(), "");
+
+    user1 = authorizerManager.getUserCache().getIfPresent(user.getName());
+    role1 = authorizerManager.getRoleCache().getIfPresent(role1.getName());
+
+    Assert.assertNull(user1);
+    Assert.assertNull(role1);
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 08146da4b4..6d72f1d7be 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -104,6 +104,7 @@ public enum TSStatusCode {
   USER_NOT_EXIST_ERROR(605),
   ROLE_NOT_EXIST_ERROR(606),
   AUTHENTICATION_ERROR(607),
+  INVALIDATE_PERMISSION_CACHE_ERROR(608),
 
   // cluster-related errors
   PARTITION_NOT_READY(700),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 4b933e548f..f66e743b96 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -164,6 +164,24 @@ struct TAuthorizerResp {
   2: required map<string, list<string>> authorizerInfo
 }
 
+struct TUserResp{
+  1: required string username
+  2: required string password
+  3: required list<string> privilegeList
+  4: required list<string> roleList
+}
+
+struct TRoleResp{
+  1: required string roleName
+  2: required list<string> privilegeList
+}
+
+struct TPermissionInfoResp{
+  1: required TUserResp userInfo
+  2: required map<string, TRoleResp> roleInfo
+  3: required common.TSStatus status
+}
+
 struct TLoginReq {
   1: required string userrname
   2: required string password
@@ -243,9 +261,9 @@ service ConfigIService {
 
   TAuthorizerResp queryPermission(TAuthorizerReq req)
 
-  common.TSStatus login(TLoginReq req)
+  TPermissionInfoResp login(TLoginReq req)
 
-  common.TSStatus checkUserPrivileges(TCheckUserPrivilegesReq req)
+  TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req)
 
   /* ConfigNode */
 
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index dfc26e4da0..d55520a9f4 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -135,6 +135,11 @@ struct TSchemaFetchResponse {
   1: required binary serializedSchemaTree
 }
 
+struct TInvalidatePermissionCacheReq{
+  1: required string username
+  2: required string roleName
+}
+
 service InternalService {
 
   // -----------------------------------For Data Node-----------------------------------------------
@@ -210,6 +215,13 @@ service InternalService {
   * @param ConfigNode will send the latest config_node_list and load balancing policies in THeartbeatReq
   **/
   common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
+
+  /**
+   * Config node will invalidate permission Info cache.
+   *
+   * @param string:username, list<string>:roleList
+   */
+  common.TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req)
 }
 
 service DataBlockService {