You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/25 04:42:54 UTC
[iotdb] branch master updated: [IOTDB-3083] DataNode AuthorInfo cache (#5943)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c115adde55 [IOTDB-3083] DataNode AuthorInfo cache (#5943)
c115adde55 is described below
commit c115adde559c04e60bd0ac158afc982519c9d30a
Author: 任宇华 <79...@users.noreply.github.com>
AuthorDate: Wed May 25 12:42:47 2022 +0800
[IOTDB-3083] DataNode AuthorInfo cache (#5943)
---
.../iotdb/confignode/manager/ConfigManager.java | 14 +-
.../apache/iotdb/confignode/manager/Manager.java | 5 +-
.../confignode/manager/PermissionManager.java | 79 ++++-
.../iotdb/confignode/persistence/AuthorInfo.java | 96 +++++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 +-
.../confignode/persistence/AuthorInfoTest.java | 10 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 4 +-
.../Administration-Management/Administration.md | 6 +-
.../Administration-Management/Administration.md | 6 +-
pom.xml | 7 +
.../resources/conf/iotdb-engine.properties | 8 +
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 54 +---
.../AuthorityFetcher.java} | 100 +++----
.../apache/iotdb/db/auth/AuthorizerManager.java | 330 +++++++++++++++++----
.../apache/iotdb/db/client/ConfigNodeClient.java | 13 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 14 +
.../execution/config/AuthorizerConfigTask.java | 107 +------
.../thrift/impl/DataNodeTSIServiceImpl.java | 3 +-
.../service/thrift/impl/InternalServiceImpl.java | 11 +
.../iotdb/db/auth/AuthorizerManagerTest.java | 164 ++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../src/main/thrift/confignode.thrift | 22 +-
thrift/src/main/thrift/mpp.thrift | 12 +
24 files changed, 790 insertions(+), 303 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 9b2001ee0a..34477bbd6c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
@@ -503,22 +504,27 @@ public class ConfigManager implements Manager {
}
@Override
- public TSStatus login(String username, String password) {
+ public TPermissionInfoResp login(String username, String password) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return permissionManager.login(username, password);
} else {
- return status;
+ TPermissionInfoResp permissionInfoResp = new TPermissionInfoResp();
+ permissionInfoResp.setStatus(status);
+ return permissionInfoResp;
}
}
@Override
- public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
+ public TPermissionInfoResp checkUserPrivileges(
+ String username, List<String> paths, int permission) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return permissionManager.checkUserPrivileges(username, paths, permission);
} else {
- return status;
+ TPermissionInfoResp permissionInfoResp = new TPermissionInfoResp();
+ permissionInfoResp.setStatus(status);
+ return permissionInfoResp;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index f496541603..16a07d050a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionInter
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
@@ -199,10 +200,10 @@ public interface Manager {
DataSet queryPermission(ConfigRequest configRequest);
/** login */
- TSStatus login(String username, String password);
+ TPermissionInfoResp login(String username, String password);
/** Check User Privileges */
- TSStatus checkUserPrivileges(String username, List<String> paths, int permission);
+ TPermissionInfoResp checkUserPrivileges(String username, List<String> paths, int permission);
/**
* Register ConfigNode when it is first startup
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 7cbae5a3d8..d27060c5f9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -19,20 +19,42 @@
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
import java.util.List;
/** manager permission query and operation */
public class PermissionManager {
- private final Manager configManager;
+ private static final Logger logger = LoggerFactory.getLogger(PermissionManager.class);
+
+ private final ConfigManager configManager;
private final AuthorInfo authorInfo;
+ private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ INTERNAL_SERVICE_CLIENT_MANAGER =
+ new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
- public PermissionManager(Manager configManager, AuthorInfo authorInfo) {
+ public PermissionManager(ConfigManager configManager, AuthorInfo authorInfo) {
this.configManager = configManager;
this.authorInfo = authorInfo;
}
@@ -44,7 +66,18 @@ public class PermissionManager {
* @return TSStatus
*/
public TSStatus operatePermission(AuthorReq authorReq) {
- return getConsensusManager().write(authorReq).getStatus();
+ TSStatus tsStatus;
+ // If the permissions change, clear the cache content affected by the operation
+ if (authorReq.getAuthorType() == ConfigRequestType.CreateUser
+ || authorReq.getAuthorType() == ConfigRequestType.CreateRole) {
+ tsStatus = getConsensusManager().write(authorReq).getStatus();
+ } else {
+ tsStatus = invalidateCache(authorReq.getUserName(), authorReq.getRoleName());
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ tsStatus = getConsensusManager().write(authorReq).getStatus();
+ }
+ }
+ return tsStatus;
}
/**
@@ -61,11 +94,47 @@ public class PermissionManager {
return configManager.getConsensusManager();
}
- public TSStatus login(String username, String password) {
+ public TPermissionInfoResp login(String username, String password) {
return authorInfo.login(username, password);
}
- public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
+ public TPermissionInfoResp checkUserPrivileges(
+ String username, List<String> paths, int permission) {
return authorInfo.checkUserPrivileges(username, paths, permission);
}
+
+ /**
+ * When the permission information of a user or role is changed will clear all datanode
+ * permissions related to the user or role
+ */
+ public TSStatus invalidateCache(String username, String roleName) {
+ List<TDataNodeInfo> allDataNodes = configManager.getNodeManager().getOnlineDataNodes(-1);
+ TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
+ TSStatus status;
+ req.setUsername(username);
+ req.setRoleName(roleName);
+ for (TDataNodeInfo dataNodeInfo : allDataNodes) {
+ TEndPoint internalEndPoint = dataNodeInfo.getLocation().getInternalEndPoint();
+ try {
+ status =
+ INTERNAL_SERVICE_CLIENT_MANAGER
+ .borrowClient(internalEndPoint)
+ .invalidatePermissionCache(req);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ status.setMessage(
+ "datanode cache initialization failed, ip: "
+ + internalEndPoint.getIp()
+ + ", port: "
+ + internalEndPoint.getPort());
+ return status;
+ }
+ } catch (IOException | TException e) {
+ logger.error("Failed to initialize cache, the error is {}", e);
+ return RpcUtils.getStatus(
+ TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR,
+ "Failed to initialize cache, the error is " + e.getMessage());
+ }
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index 3c8563b10f..40df733487 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -36,6 +36,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRoleResp;
+import org.apache.iotdb.confignode.rpc.thrift.TUserResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -58,7 +61,7 @@ public class AuthorInfo implements SnapshotProcessor {
private IAuthorizer authorizer;
- {
+ public AuthorInfo() {
try {
authorizer = BasicAuthorizer.getInstance();
} catch (AuthException e) {
@@ -66,29 +69,40 @@ public class AuthorInfo implements SnapshotProcessor {
}
}
- public TSStatus login(String username, String password) {
+ public TPermissionInfoResp login(String username, String password) {
boolean status;
String loginMessage = null;
TSStatus tsStatus = new TSStatus();
+ TPermissionInfoResp result = new TPermissionInfoResp();
try {
status = authorizer.login(username, password);
+ if (status) {
+ // Bring this user's permission information back to the datanode for caching
+ result = getUserPermissionInfo(username);
+ result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully"));
+ } else {
+ result.setUserInfo(new TUserResp("", "", new ArrayList<>(), new ArrayList<>()));
+ Map<String, TRoleResp> roleInfo = new HashMap<>();
+ roleInfo.put("", new TRoleResp("", new ArrayList<>()));
+ result.setRoleInfo(roleInfo);
+ }
} catch (AuthException e) {
- logger.info("meet error while logging in.", e);
+ logger.error("meet error while logging in.", e);
status = false;
loginMessage = e.getMessage();
}
- if (status) {
- tsStatus.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- tsStatus.setMessage("Login successfully");
- } else {
+ if (!status) {
tsStatus.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
tsStatus.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
+ result.setStatus(tsStatus);
}
- return tsStatus;
+ return result;
}
- public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
+ public TPermissionInfoResp checkUserPrivileges(
+ String username, List<String> paths, int permission) {
boolean status = true;
+ TPermissionInfoResp result = new TPermissionInfoResp();
try {
for (String path : paths) {
if (!checkOnePath(username, path, permission)) {
@@ -99,9 +113,23 @@ public class AuthorInfo implements SnapshotProcessor {
status = false;
}
if (status) {
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ try {
+ // Bring this user's permission information back to the datanode for caching
+ result = getUserPermissionInfo(username);
+ result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ } catch (AuthException e) {
+ result.setStatus(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_PERMISSION_EXCEPTION_ERROR, e.getMessage()));
+ return result;
+ }
+ return result;
} else {
- return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+ result.setUserInfo(new TUserResp("", "", new ArrayList<>(), new ArrayList<>()));
+ Map<String, TRoleResp> roleInfo = new HashMap<>();
+ roleInfo.put("", new TRoleResp("", new ArrayList<>()));
+ result.setRoleInfo(roleInfo);
+ result.setStatus(RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR));
+ return result;
}
}
@@ -362,4 +390,50 @@ public class AuthorInfo implements SnapshotProcessor {
}
authorizer.reset();
}
+
+ /**
+ * Save the user's permission information,Bring back the DataNode for caching
+ *
+ * @param username The username of the user that needs to be cached
+ */
+ public TPermissionInfoResp getUserPermissionInfo(String username) throws AuthException {
+ TPermissionInfoResp result = new TPermissionInfoResp();
+ TUserResp tUserResp = new TUserResp();
+ TRoleResp tRoleResp = new TRoleResp();
+ Map<String, TRoleResp> tRoleRespMap = new HashMap();
+ List<String> userPrivilegeList = new ArrayList<>();
+ List<String> rolePrivilegeList = new ArrayList<>();
+
+ // User permission information
+ User user = authorizer.getUser(username);
+ if (user.getPrivilegeList() != null) {
+ for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+ userPrivilegeList.add(pathPrivilege.getPath());
+ String privilegeIdList = pathPrivilege.getPrivileges().toString();
+ userPrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+ }
+ tUserResp.setUsername(user.getName());
+ tUserResp.setPassword(user.getPassword());
+ tUserResp.setPrivilegeList(userPrivilegeList);
+ tUserResp.setRoleList(user.getRoleList());
+ }
+
+ // Permission information for roles owned by users
+ if (user.getRoleList() != null) {
+ for (String roleName : user.getRoleList()) {
+ Role role = authorizer.getRole(roleName);
+ tRoleResp.setRoleName(roleName);
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ rolePrivilegeList.add(pathPrivilege.getPath());
+ String privilegeIdList = pathPrivilege.getPrivileges().toString();
+ rolePrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+ }
+ tRoleResp.setPrivilegeList(rolePrivilegeList);
+ tRoleRespMap.put(roleName, tRoleResp);
+ }
+ }
+ result.setUserInfo(tUserResp);
+ result.setRoleInfo(tRoleRespMap);
+ return result;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 3b20df5131..6cc00fb56f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -356,12 +357,12 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
}
@Override
- public TSStatus login(TLoginReq req) throws TException {
+ public TPermissionInfoResp login(TLoginReq req) throws TException {
return configManager.login(req.getUserrname(), req.getPassword());
}
@Override
- public TSStatus checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
+ public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
return configManager.checkUserPrivileges(
req.getUsername(), req.getPaths(), req.getPermission());
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
index 1bf52a65b3..2e2d96ef3f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
@@ -112,7 +112,10 @@ public class AuthorInfoTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// check user privileges
- status = authorInfo.checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal());
+ status =
+ authorInfo
+ .checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal())
+ .getStatus();
Assert.assertEquals(TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(), status.getCode());
// drop user
@@ -163,7 +166,10 @@ public class AuthorInfoTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// check user privileges
- status = authorInfo.checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal());
+ status =
+ authorInfo
+ .checkUserPrivileges("user0", paths, PrivilegeType.DELETE_USER.ordinal())
+ .getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// grant role
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 371993af04..929145250b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -611,7 +611,7 @@ public class ConfigNodeRPCServiceProcessorTest {
// check user privileges
checkUserPrivilegesReq =
new TCheckUserPrivilegesReq("tempuser0", paths, PrivilegeType.DELETE_USER.ordinal());
- status = processor.checkUserPrivileges(checkUserPrivilegesReq);
+ status = processor.checkUserPrivileges(checkUserPrivilegesReq).getStatus();
Assert.assertEquals(TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(), status.getCode());
// drop user
@@ -707,7 +707,7 @@ public class ConfigNodeRPCServiceProcessorTest {
// check user privileges
checkUserPrivilegesReq =
new TCheckUserPrivilegesReq("tempuser0", paths, PrivilegeType.DELETE_USER.ordinal());
- status = processor.checkUserPrivileges(checkUserPrivilegesReq);
+ status = processor.checkUserPrivileges(checkUserPrivilegesReq).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// grant role
diff --git a/docs/UserGuide/Administration-Management/Administration.md b/docs/UserGuide/Administration-Management/Administration.md
index 6411080736..ae91051e00 100644
--- a/docs/UserGuide/Administration-Management/Administration.md
+++ b/docs/UserGuide/Administration-Management/Administration.md
@@ -343,4 +343,8 @@ IoTDB specifies that the character length of a role name should have no less tha
A path pattern's result set contains all the elements of its sub pattern's
result set. For example, `root.sg.d.*` is a sub pattern of
`root.sg.*.*`, while `root.sg.**` is not a sub pattern of
-`root.sg.*.*`. When a user is granted privilege on a pattern, the pattern used in his DDL or DML must be a sub pattern of the privilege pattern, which guarantees that the user won't access the timeseries exceed his privilege scope.
\ No newline at end of file
+`root.sg.*.*`. When a user is granted privilege on a pattern, the pattern used in his DDL or DML must be a sub pattern of the privilege pattern, which guarantees that the user won't access the timeseries exceed his privilege scope.
+
+### Permission cache
+
+In distributed related permission operations, when changing permissions other than creating users and roles, all the cache information of `dataNode` related to the user (role) will be cleared first. If any `dataNode` cache information is clear and fails, the permission change task will fail.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Administration-Management/Administration.md b/docs/zh/UserGuide/Administration-Management/Administration.md
index 74a46ea08a..2b1f3a0374 100644
--- a/docs/zh/UserGuide/Administration-Management/Administration.md
+++ b/docs/zh/UserGuide/Administration-Management/Administration.md
@@ -339,4 +339,8 @@ IoTDB 规定角色名的字符长度不小于 4,其中角色名不能包含空
### 权限管理中的路径模式
-一个路径模式的结果集包含了它的子模式的结果集的所有元素。例如,`root.sg.d.*`是`root.sg.*.*`的子模式,而`root.sg.**`不是`root.sg.*.*`的子模式。当用户被授予对某个路径模式的权限时,在他的DDL或DML中使用的模式必须是该路径模式的子模式,这保证了用户访问时间序列时不会超出他的权限范围。
\ No newline at end of file
+一个路径模式的结果集包含了它的子模式的结果集的所有元素。例如,`root.sg.d.*`是`root.sg.*.*`的子模式,而`root.sg.**`不是`root.sg.*.*`的子模式。当用户被授予对某个路径模式的权限时,在他的DDL或DML中使用的模式必须是该路径模式的子模式,这保证了用户访问时间序列时不会超出他的权限范围。
+
+### 权限缓存
+
+在分布式相关的权限操作中,在进行除了创建用户和角色之外的其他权限更改操作时,都会先清除与该用户(角色)相关的所有的`dataNode`的缓存信息,如果任何一台`dataNode`缓存信息清楚失败,这个权限更改的任务就会失败。
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 97f96e0f1c..85fa4750a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,6 +213,8 @@
<awaitility.version>4.0.2</awaitility.version>
<!-- JDK1.8 only support google java format 1.7-->
<google.java.format.version>1.7</google.java.format.version>
+ <!-- caffeine cache -->
+ <caffeine>2.9.1</caffeine>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
@@ -602,6 +604,11 @@
<artifactId>jetty-server</artifactId>
<version>${jetty-server.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<dependencies>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 0bd7c6a889..66c86c5ee3 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -832,6 +832,14 @@ timestamp_precision=ms
#openID_url=
+# Cache size of user and role
+# Datatype: int
+# author_cache_size=1000
+
+# Cache expire time of user and role
+# Datatype: int
+# author_cache_expire_time=30
+
####################
### UDF Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index b2e58c3610..94c6a10001 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -21,16 +21,9 @@ package org.apache.iotdb.db.auth;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.AuthUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.db.client.ConfigNodeClient;
-import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -41,11 +34,9 @@ import org.apache.iotdb.rpc.ConfigNodeConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -60,10 +51,6 @@ public class AuthorityChecker {
private static AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
private static SessionManager sessionManager = SessionManager.getInstance();
- private static final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
- new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
-
private AuthorityChecker() {}
/**
@@ -118,7 +105,7 @@ public class AuthorityChecker {
*/
public static boolean checkPermission(
String username, List<? extends PartialPath> paths, StatementType type, String targetUser)
- throws ConfigNodeConnectionException {
+ throws AuthException, ConfigNodeConnectionException {
if (SUPER_USER.equals(username)) {
return true;
}
@@ -141,7 +128,7 @@ public class AuthorityChecker {
allPath.add(AuthUtils.ROOT_PATH_PRIVILEGE);
}
- TSStatus status = checkPath(username, allPath, permission);
+ TSStatus status = authorizerManager.checkPermissionCache(username, allPath, permission);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return true;
} else {
@@ -163,25 +150,6 @@ public class AuthorityChecker {
return false;
}
- /** Check the user */
- public static TSStatus checkUser(String username, String password)
- throws ConfigNodeConnectionException {
- TLoginReq req = new TLoginReq(username, password);
- TSStatus status = null;
- try (ConfigNodeClient configNodeClient =
- configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- status = configNodeClient.login(req);
- } catch (TException | IOException e) {
- throw new ConfigNodeConnectionException("Couldn't connect config node");
- } finally {
- if (status == null) {
- status = new TSStatus();
- }
- }
- return status;
- }
-
/** Check whether specific Session has the authorization to given plan. */
public static TSStatus checkAuthority(Statement statement, long sessionId) {
try {
@@ -214,24 +182,6 @@ public class AuthorityChecker {
username, statement.getPaths(), statement.getType(), targetUser);
}
- public static TSStatus checkPath(String username, List<String> allPath, int permission)
- throws ConfigNodeConnectionException {
- TCheckUserPrivilegesReq req = new TCheckUserPrivilegesReq(username, allPath, permission);
- TSStatus status = null;
- try (ConfigNodeClient configNodeClient =
- configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- status = configNodeClient.checkUserPrivileges(req);
- } catch (TException | IOException e) {
- throw new ConfigNodeConnectionException("Couldn't connect config node");
- } finally {
- if (status == null) {
- status = new TSStatus();
- }
- }
- return status;
- }
-
private static int translateToPermissionId(Operator.OperatorType type) {
switch (type) {
case GRANT_ROLE_PRIVILEGE:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
similarity index 67%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
copy to server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
index 31be6557a2..78d92bb33e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
@@ -17,29 +17,31 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.execution.config;
+package org.apache.iotdb.db.auth;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
+import org.apache.iotdb.db.mpp.plan.execution.config.AuthorizerConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.rpc.ConfigNodeConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -51,54 +53,52 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
-public class AuthorizerConfigTask implements IConfigTask {
+public class AuthorityFetcher {
- private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizerConfigTask.class);
+ private static final Logger logger = LoggerFactory.getLogger(AuthorizerConfigTask.class);
- private AuthorStatement authorStatement;
+ private static final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
+ new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+ .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
- public AuthorizerConfigTask(AuthorStatement authorStatement) {
- this.authorStatement = authorStatement;
+ public static TPermissionInfoResp checkPath(String username, List<String> allPath, int permission)
+ throws ConfigNodeConnectionException {
+ TCheckUserPrivilegesReq req = new TCheckUserPrivilegesReq(username, allPath, permission);
+ TPermissionInfoResp status = null;
+ try (ConfigNodeClient configNodeClient =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ status = configNodeClient.checkUserPrivileges(req);
+ } catch (TException | IOException e) {
+ throw new ConfigNodeConnectionException("Couldn't connect config node");
+ } finally {
+ if (status == null) {
+ status = new TPermissionInfoResp();
+ }
+ }
+ return status;
}
- @Override
- public ListenableFuture<ConfigTaskResult> execute(
- IClientManager<PartitionRegionId, ConfigNodeClient> clientManager) {
- SettableFuture<ConfigTaskResult> future = null;
+ /** Check the user */
+ public static TPermissionInfoResp checkUser(String username, String password)
+ throws ConfigNodeConnectionException {
+ TLoginReq req = new TLoginReq(username, password);
+ TPermissionInfoResp status = null;
try (ConfigNodeClient configNodeClient =
- clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Construct request using statement
- TAuthorizerReq req =
- new TAuthorizerReq(
- authorStatement.getAuthorType().ordinal(),
- authorStatement.getUserName() == null ? "" : authorStatement.getUserName(),
- authorStatement.getRoleName() == null ? "" : authorStatement.getRoleName(),
- authorStatement.getPassWord() == null ? "" : authorStatement.getPassWord(),
- authorStatement.getNewPassword() == null ? "" : authorStatement.getNewPassword(),
- AuthorPlan.strToPermissions(authorStatement.getPrivilegeList()),
- authorStatement.getNodeName() == null
- ? ""
- : authorStatement.getNodeName().getFullPath());
-
+ configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
// Send request to some API server
- if (authorStatement.getQueryType() == QueryType.WRITE) {
- future = operatePermission(req, configNodeClient);
- } else {
- future = queryPermission(req, configNodeClient);
+ status = configNodeClient.login(req);
+ } catch (TException | IOException e) {
+ throw new ConfigNodeConnectionException("Couldn't connect config node");
+ } finally {
+ if (status == null) {
+ status = new TPermissionInfoResp();
}
- } catch (AuthException e) {
- LOGGER.error("No such privilege {}.", authorStatement.getAuthorType());
- future.setException(e);
- } catch (IOException e) {
- LOGGER.error("can't connect to all config nodes", e);
- future.setException(e);
}
- // If the action is executed successfully, return the Future.
- // If your operation is async, you can return the corresponding future directly.
- return future;
+ return status;
}
- private SettableFuture<ConfigTaskResult> operatePermission(
+ public static SettableFuture<ConfigTaskResult> operatePermission(
TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try {
@@ -106,7 +106,7 @@ public class AuthorizerConfigTask implements IConfigTask {
TSStatus tsStatus = configNodeClient.operatePermission(authorizerReq);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.error(
+ logger.error(
"Failed to execute {} in config node, status is {}.",
AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
.toString()
@@ -117,7 +117,7 @@ public class AuthorizerConfigTask implements IConfigTask {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (TException e) {
- LOGGER.error("Failed to connect to config node.");
+ logger.error("Failed to connect to config node.");
future.setException(e);
}
// If the action is executed successfully, return the Future.
@@ -125,7 +125,7 @@ public class AuthorizerConfigTask implements IConfigTask {
return future;
}
- private SettableFuture<ConfigTaskResult> queryPermission(
+ public static SettableFuture<ConfigTaskResult> queryPermission(
TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TAuthorizerResp authorizerResp;
@@ -134,7 +134,7 @@ public class AuthorizerConfigTask implements IConfigTask {
authorizerResp = configNodeClient.queryPermission(authorizerReq);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != authorizerResp.getStatus().getCode()) {
- LOGGER.error(
+ logger.error(
"Failed to execute {} in config node, status is {}.",
AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
.toString()
@@ -172,12 +172,8 @@ public class AuthorizerConfigTask implements IConfigTask {
new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
}
} catch (TException e) {
- LOGGER.error("Failed to connect to config node.");
+ logger.error("Failed to connect to config node.");
future.setException(e);
- } finally {
- if (configNodeClient != null) {
- configNodeClient.close();
- }
}
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future directly.
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
index 7ea4e3f225..057f9f9d57 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
@@ -19,21 +19,38 @@
package org.apache.iotdb.db.auth;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.commons.auth.entity.PathPrivilege;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
-
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.rpc.ConfigNodeConnectionException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class AuthorizerManager implements IAuthorizer {
@@ -41,12 +58,25 @@ public class AuthorizerManager implements IAuthorizer {
private static final Logger logger = LoggerFactory.getLogger(AuthorizerManager.class);
private IAuthorizer iAuthorizer;
- private ReentrantReadWriteLock snapshotLock;
+ private ReentrantReadWriteLock authReadWriteLock;
+ private IoTDBDescriptor conf = IoTDBDescriptor.getInstance();
+
+ private Cache<String, User> userCache =
+ Caffeine.newBuilder()
+ .maximumSize(conf.getConfig().getAuthorCacheSize())
+ .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(), TimeUnit.MINUTES)
+ .build();
+
+ private Cache<String, Role> roleCache =
+ Caffeine.newBuilder()
+ .maximumSize(conf.getConfig().getAuthorCacheSize())
+ .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(), TimeUnit.MINUTES)
+ .build();
public AuthorizerManager() {
try {
iAuthorizer = BasicAuthorizer.getInstance();
- snapshotLock = new ReentrantReadWriteLock();
+ authReadWriteLock = new ReentrantReadWriteLock();
} catch (AuthException e) {
logger.error(e.getMessage());
}
@@ -65,146 +95,146 @@ public class AuthorizerManager implements IAuthorizer {
@Override
public boolean login(String username, String password) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.login(username, password);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public void createUser(String username, String password) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.createUser(username, password);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void deleteUser(String username) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.deleteUser(username);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void grantPrivilegeToUser(String username, String path, int privilegeId)
throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.grantPrivilegeToUser(username, path, privilegeId);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void revokePrivilegeFromUser(String username, String path, int privilegeId)
throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.revokePrivilegeFromUser(username, path, privilegeId);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void createRole(String roleName) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.createRole(roleName);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void deleteRole(String roleName) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.deleteRole(roleName);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void grantPrivilegeToRole(String roleName, String path, int privilegeId)
throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.grantPrivilegeToRole(roleName, path, privilegeId);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void revokePrivilegeFromRole(String roleName, String path, int privilegeId)
throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.revokePrivilegeFromRole(roleName, path, privilegeId);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void grantRoleToUser(String roleName, String username) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.grantRoleToUser(roleName, username);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void revokeRoleFromUser(String roleName, String username) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.revokeRoleFromUser(roleName, username);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public Set<Integer> getPrivileges(String username, String path) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.getPrivileges(username, path);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public void updateUserPassword(String username, String newPassword) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.updateUserPassword(username, newPassword);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public boolean checkUserPrivileges(String username, String path, int privilegeId)
throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.checkUserPrivileges(username, path, privilegeId);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@@ -215,131 +245,317 @@ public class AuthorizerManager implements IAuthorizer {
@Override
public List<String> listAllUsers() {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.listAllUsers();
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public List<String> listAllRoles() {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.listAllRoles();
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public Role getRole(String roleName) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.getRole(roleName);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public User getUser(String username) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.getUser(username);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public boolean isUserUseWaterMark(String userName) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.isUserUseWaterMark(userName);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public void setUserUseWaterMark(String userName, boolean useWaterMark) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
iAuthorizer.setUserUseWaterMark(userName, useWaterMark);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public Map<String, Boolean> getAllUserWaterMarkStatus() {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.getAllUserWaterMarkStatus();
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public Map<String, User> getAllUsers() {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.getAllUsers();
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public Map<String, Role> getAllRoles() {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
return iAuthorizer.getAllRoles();
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public void replaceAllUsers(Map<String, User> users) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
iAuthorizer.replaceAllUsers(users);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public void replaceAllRoles(Map<String, Role> roles) throws AuthException {
- snapshotLock.readLock().lock();
+ authReadWriteLock.readLock().lock();
try {
iAuthorizer.replaceAllRoles(roles);
} finally {
- snapshotLock.readLock().unlock();
+ authReadWriteLock.readLock().unlock();
}
}
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
- snapshotLock.writeLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
return iAuthorizer.processTakeSnapshot(snapshotDir);
} finally {
- snapshotLock.writeLock().unlock();
+ authReadWriteLock.writeLock().unlock();
}
}
@Override
public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
- snapshotLock.writeLock().lock();
+ authReadWriteLock.writeLock().lock();
try {
iAuthorizer.processLoadSnapshot(snapshotDir);
} finally {
- snapshotLock.writeLock().unlock();
+ authReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ public TSStatus checkPermissionCache(String username, List<String> allPath, int permission)
+ throws AuthException, ConfigNodeConnectionException {
+ authReadWriteLock.readLock().lock();
+ try {
+ User user = userCache.getIfPresent(username);
+ if (user != null) {
+ for (String path : allPath) {
+ if (!user.checkPrivilege(path, permission)) {
+ if (user.getRoleList().isEmpty()) {
+ return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+ }
+ boolean status = false;
+ for (String roleName : user.getRoleList()) {
+ Role role = roleCache.getIfPresent(roleName);
+ // It is detected that the role of the user does not exist in the cache, indicating
+ // that the permission information of the role has changed.
+ // The user cache needs to be initialized
+ if (role == null) {
+ invalidateCache(username, "");
+ return checkPath(username, allPath, permission);
+ }
+ status = role.checkPrivilege(path, permission);
+ if (status) {
+ break;
+ }
+ }
+ if (!status) {
+ return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+ }
+ }
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return checkPath(username, allPath, permission);
+ }
+ } finally {
+ authReadWriteLock.readLock().unlock();
+ }
+ }
+
+ public TSStatus checkPath(String username, List<String> allPath, int permission)
+ throws ConfigNodeConnectionException {
+ TPermissionInfoResp tPermissionInfoResp =
+ AuthorityFetcher.checkPath(username, allPath, permission);
+ if (tPermissionInfoResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ userCache.put(username, cacheUser(tPermissionInfoResp));
+ return tPermissionInfoResp.getStatus();
+ } else {
+ return tPermissionInfoResp.getStatus();
}
}
+
+ /** Check the user */
+ public TSStatus checkUser(String username, String password) throws ConfigNodeConnectionException {
+ authReadWriteLock.readLock().lock();
+ try {
+ User user = userCache.getIfPresent(username);
+ if (user != null) {
+ if (password != null && AuthUtils.validatePassword(password, user.getPassword())) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, "Authentication failed.");
+ }
+ } else {
+ TPermissionInfoResp tPermissionInfoResp = AuthorityFetcher.checkUser(username, password);
+ if (tPermissionInfoResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ userCache.put(username, cacheUser(tPermissionInfoResp));
+ return tPermissionInfoResp.getStatus();
+ } else {
+ return tPermissionInfoResp.getStatus();
+ }
+ }
+ } finally {
+ authReadWriteLock.readLock().unlock();
+ }
+ }
+
+ public SettableFuture<ConfigTaskResult> queryPermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ authReadWriteLock.readLock().lock();
+ try {
+ return AuthorityFetcher.queryPermission(authorizerReq, configNodeClient);
+ } finally {
+ authReadWriteLock.readLock().unlock();
+ }
+ }
+
+ public SettableFuture<ConfigTaskResult> operatePermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ authReadWriteLock.writeLock().lock();
+ try {
+ return AuthorityFetcher.operatePermission(authorizerReq, configNodeClient);
+ } finally {
+ authReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ /** cache user */
+ public User cacheUser(TPermissionInfoResp tPermissionInfoResp) {
+ User user = new User();
+ List<String> privilegeList = tPermissionInfoResp.getUserInfo().getPrivilegeList();
+ List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
+ user.setName(tPermissionInfoResp.getUserInfo().getUsername());
+ user.setPassword(tPermissionInfoResp.getUserInfo().getPassword());
+ for (int i = 0; i < privilegeList.size(); i++) {
+ String path = privilegeList.get(i);
+ String privilege = privilegeList.get(++i);
+ pathPrivilegeList.add(toPathPrivilege(path, privilege));
+ }
+ user.setPrivilegeList(pathPrivilegeList);
+ user.setRoleList(tPermissionInfoResp.getUserInfo().getRoleList());
+ for (String roleName : tPermissionInfoResp.getRoleInfo().keySet()) {
+ roleCache.put(roleName, cacheRole(roleName, tPermissionInfoResp));
+ }
+ return user;
+ }
+
+ /** cache role */
+ public Role cacheRole(String roleName, TPermissionInfoResp tPermissionInfoResp) {
+ Role role = new Role();
+ List<String> privilegeList = tPermissionInfoResp.getRoleInfo().get(roleName).getPrivilegeList();
+ List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
+ role.setName(tPermissionInfoResp.getRoleInfo().get(roleName).getRoleName());
+ for (int i = 0; i < privilegeList.size(); i++) {
+ String path = privilegeList.get(i);
+ String privilege = privilegeList.get(++i);
+ pathPrivilegeList.add(toPathPrivilege(path, privilege));
+ }
+ role.setPrivilegeList(pathPrivilegeList);
+ return role;
+ }
+
+ /**
+ * Initialize user and role cache information.
+ *
+ * <p>If the permission information of the role changes, only the role cache information is
+ * cleared. During permission checking, if the role belongs to a user, the user will be
+ * initialized.
+ */
+ public boolean invalidateCache(String username, String roleName) {
+ if (userCache.getIfPresent(username) != null) {
+ List<String> roleList = userCache.getIfPresent(username).getRoleList();
+ if (!roleList.isEmpty()) {
+ roleCache.invalidateAll(roleList);
+ }
+ userCache.invalidate(username);
+ }
+ if (roleCache.getIfPresent(roleName) != null) {
+ roleCache.invalidate(roleName);
+ }
+ if (userCache.getIfPresent(username) != null && roleCache.getIfPresent(roleName) != null) {
+ logger.error("datanode cache initialization failed");
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Convert user privilege information obtained from confignode to PathPrivilege
+ *
+ * @param path permission path
+ * @param privilege privilegeIds
+ * @return
+ */
+ private PathPrivilege toPathPrivilege(String path, String privilege) {
+ PathPrivilege pathPrivilege = new PathPrivilege();
+ String[] privileges = privilege.replace(" ", "").split(",");
+ Set<Integer> privilegeIds = new HashSet<>();
+ for (String p : privileges) {
+ privilegeIds.add(Integer.parseInt(p));
+ }
+ pathPrivilege.setPrivileges(privilegeIds);
+ pathPrivilege.setPath(path);
+ return pathPrivilege;
+ }
+
+ public Cache<String, User> getUserCache() {
+ return userCache;
+ }
+
+ public Cache<String, Role> getRoleCache() {
+ return roleCache;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f1a9f40bda..ebd6034542 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -524,11 +525,11 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
}
@Override
- public TSStatus login(TLoginReq req) throws TException {
+ public TPermissionInfoResp login(TLoginReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TSStatus status = client.login(req);
- if (!updateConfigNodeLeader(status)) {
+ TPermissionInfoResp status = client.login(req);
+ if (!updateConfigNodeLeader(status.getStatus())) {
return status;
}
} catch (TException e) {
@@ -540,11 +541,11 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
}
@Override
- public TSStatus checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
+ public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TSStatus status = client.checkUserPrivileges(req);
- if (!updateConfigNodeLeader(status)) {
+ TPermissionInfoResp status = client.checkUserPrivileges(req);
+ if (!updateConfigNodeLeader(status.getStatus())) {
return status;
}
} catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cf3e35f044..14f427eb7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -881,6 +881,12 @@ public class IoTDBConfig {
*/
private int partitionCacheSize = 10000;
+ /** Cache size of user and role */
+ private int authorCacheSize = 100;
+
+ /** Cache expire time of user and role */
+ private int authorCacheExpireTime = 30;
+
IoTDBConfig() {}
public float getUdfMemoryBudgetInMB() {
@@ -2760,4 +2766,20 @@ public class IoTDBConfig {
public void setPartitionCacheSize(int partitionCacheSize) {
this.partitionCacheSize = partitionCacheSize;
}
+
+ public int getAuthorCacheSize() {
+ return authorCacheSize;
+ }
+
+ public void setAuthorCacheSize(int authorCacheSize) {
+ this.authorCacheSize = authorCacheSize;
+ }
+
+ public int getAuthorCacheExpireTime() {
+ return authorCacheExpireTime;
+ }
+
+ public void setAuthorCacheExpireTime(int authorCacheExpireTime) {
+ this.authorCacheExpireTime = authorCacheExpireTime;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f40850fef2..d5d9fc6c78 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -887,6 +887,9 @@ public class IoTDBDescriptor {
// shuffle
loadShuffleProps(properties);
+
+ // author cache
+ loadAuthorCache(properties);
} catch (FileNotFoundException e) {
logger.warn("Fail to find config file {}", url, e);
} catch (IOException e) {
@@ -900,6 +903,17 @@ public class IoTDBDescriptor {
}
}
+ private void loadAuthorCache(Properties properties) {
+ conf.setAuthorCacheSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "author_cache_size", String.valueOf(conf.getAuthorCacheSize()))));
+ conf.setAuthorCacheExpireTime(
+ Integer.parseInt(
+ properties.getProperty(
+ "author_cache_expire_time", String.valueOf(conf.getAuthorCacheExpireTime()))));
+ }
+
// to keep consistent with the cluster module.
private void replaceHostnameWithIP() throws UnknownHostException, BadNodeUrlFormatException {
boolean isInvalidRpcIp = InetAddresses.isInetAddress(conf.getRpcAddress());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
index 31be6557a2..3193464919 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
@@ -19,43 +19,30 @@
package org.apache.iotdb.db.mpp.plan.execution.config;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
+import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
-import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
public class AuthorizerConfigTask implements IConfigTask {
private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizerConfigTask.class);
private AuthorStatement authorStatement;
+ private AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
public AuthorizerConfigTask(AuthorStatement authorStatement) {
this.authorStatement = authorStatement;
@@ -82,9 +69,9 @@ public class AuthorizerConfigTask implements IConfigTask {
// Send request to some API server
if (authorStatement.getQueryType() == QueryType.WRITE) {
- future = operatePermission(req, configNodeClient);
+ future = authorizerManager.operatePermission(req, configNodeClient);
} else {
- future = queryPermission(req, configNodeClient);
+ future = authorizerManager.queryPermission(req, configNodeClient);
}
} catch (AuthException e) {
LOGGER.error("No such privilege {}.", authorStatement.getAuthorType());
@@ -97,90 +84,4 @@ public class AuthorizerConfigTask implements IConfigTask {
// If your operation is async, you can return the corresponding future directly.
return future;
}
-
- private SettableFuture<ConfigTaskResult> operatePermission(
- TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
- SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try {
- // Send request to some API server
- TSStatus tsStatus = configNodeClient.operatePermission(authorizerReq);
- // Get response or throw exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.error(
- "Failed to execute {} in config node, status is {}.",
- AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
- .toString()
- .toLowerCase(Locale.ROOT),
- tsStatus);
- future.setException(new StatementExecutionException(tsStatus));
- } else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- }
- } catch (TException e) {
- LOGGER.error("Failed to connect to config node.");
- future.setException(e);
- }
- // If the action is executed successfully, return the Future.
- // If your operation is async, you can return the corresponding future directly.
- return future;
- }
-
- private SettableFuture<ConfigTaskResult> queryPermission(
- TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
- SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- TAuthorizerResp authorizerResp;
- try {
- // Send request to some API server
- authorizerResp = configNodeClient.queryPermission(authorizerReq);
- // Get response or throw exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != authorizerResp.getStatus().getCode()) {
- LOGGER.error(
- "Failed to execute {} in config node, status is {}.",
- AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
- .toString()
- .toLowerCase(Locale.ROOT),
- authorizerResp.getStatus());
- future.setException(new StatementExecutionException(authorizerResp.getStatus()));
- } else {
- // build TSBlock
- List<TSDataType> types = new ArrayList<>();
- Map<String, List<String>> authorizerInfo = authorizerResp.getAuthorizerInfo();
- for (int i = 0; i < authorizerInfo.size(); i++) {
- types.add(TSDataType.TEXT);
- }
- TsBlockBuilder builder = new TsBlockBuilder(types);
- List<ColumnHeader> headerList = new ArrayList<>();
-
- for (String header : authorizerInfo.keySet()) {
- headerList.add(new ColumnHeader(header, TSDataType.TEXT));
- }
- // The Time column will be ignored by the setting of ColumnHeader.
- // So we can put a meaningless value here
- for (String value : authorizerInfo.get(headerList.get(0).getColumnName())) {
- builder.getTimeColumnBuilder().writeLong(0L);
- builder.getColumnBuilder(0).writeBinary(new Binary(value));
- builder.declarePosition();
- }
- for (int i = 1; i < headerList.size(); i++) {
- for (String value : authorizerInfo.get(headerList.get(i).getColumnName())) {
- builder.getColumnBuilder(i).writeBinary(new Binary(value));
- }
- }
-
- DatasetHeader datasetHeader = new DatasetHeader(headerList, true);
- future.set(
- new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
- }
- } catch (TException e) {
- LOGGER.error("Failed to connect to config node.");
- future.setException(e);
- } finally {
- if (configNodeClient != null) {
- configNodeClient.close();
- }
- }
- // If the action is executed successfully, return the Future.
- // If your operation is async, you can return the corresponding future directly.
- return future;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 86bc50adef..2823642630 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
@@ -151,7 +152,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
TSStatus loginStatus;
try {
- loginStatus = AuthorityChecker.checkUser(req.username, req.password);
+ loginStatus = AuthorizerManager.getInstance().checkUser(req.username, req.password);
} catch (ConfigNodeConnectionException e) {
TSStatus tsStatus = RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR, e.getMessage());
return new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 93e679610f..4c7f08edf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.consensus.ConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
@@ -75,12 +76,14 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -325,6 +328,14 @@ public class InternalServiceImpl implements InternalService.Iface {
return result;
}
+ @Override
+ public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) throws TException {
+ if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ return RpcUtils.getStatus(TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
+ }
+
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws TException {
long queryIdRaw = SessionManager.getInstance().requestQueryId(false);
diff --git a/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java b/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
new file mode 100644
index 0000000000..2dfa147efc
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.auth;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.auth.entity.PathPrivilege;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.auth.entity.Role;
+import org.apache.iotdb.commons.auth.entity.User;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRoleResp;
+import org.apache.iotdb.confignode.rpc.thrift.TUserResp;
+import org.apache.iotdb.rpc.ConfigNodeConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AuthorizerManagerTest {
+
+ AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
+
+ @Test
+ public void permissionCacheTest() throws ConfigNodeConnectionException, AuthException {
+ User user = new User();
+ Role role1 = new Role();
+ Role role2 = new Role();
+ List<String> roleList = new ArrayList<>();
+ Set<Integer> privilegesIds = new HashSet<>();
+ PathPrivilege privilege = new PathPrivilege();
+ List<PathPrivilege> privilegeList = new ArrayList<>();
+ privilegesIds.add(PrivilegeType.CREATE_ROLE.ordinal());
+ privilegesIds.add(PrivilegeType.REVOKE_USER_ROLE.ordinal());
+ privilege.setPath("root.ln");
+ privilege.setPrivileges(privilegesIds);
+ privilegeList.add(privilege);
+ role1.setName("role1");
+ role1.setPrivilegeList(privilegeList);
+ role2.setName("role2");
+ role2.setPrivilegeList(new ArrayList<>());
+ roleList.add("role1");
+ roleList.add("role2");
+ user.setName("user");
+ user.setPassword("password");
+ user.setPrivilegeList(privilegeList);
+ user.setRoleList(roleList);
+ TPermissionInfoResp result = new TPermissionInfoResp();
+ TUserResp tUserResp = new TUserResp();
+ Map<String, TRoleResp> tRoleRespMap = new HashMap();
+ List<String> userPrivilegeList = new ArrayList<>();
+ List<String> rolePrivilegeList = new ArrayList<>();
+ List<Role> roleList1 = new ArrayList<>();
+ roleList1.add(role1);
+ roleList1.add(role2);
+
+ // User permission information
+ for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+ userPrivilegeList.add(pathPrivilege.getPath());
+ String privilegeIdList = pathPrivilege.getPrivileges().toString();
+ userPrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+ }
+ tUserResp.setUsername(user.getName());
+ tUserResp.setPassword(user.getPassword());
+ tUserResp.setPrivilegeList(userPrivilegeList);
+ tUserResp.setRoleList(new ArrayList<>());
+ result.setUserInfo(tUserResp);
+ result.setRoleInfo(new HashMap<>());
+
+ // User authentication permission without role
+ authorizerManager.getUserCache().put(user.getName(), authorizerManager.cacheUser(result));
+ User user1 = authorizerManager.getUserCache().getIfPresent(user.getName());
+ assert user1 != null;
+ Assert.assertEquals(user.getName(), user1.getName());
+ Assert.assertEquals(user.getPassword(), user1.getPassword());
+ Assert.assertEquals(user.getPrivilegeList(), user1.getPrivilegeList());
+
+ // User has permission
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ authorizerManager
+ .checkPermissionCache(
+ "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_ROLE.ordinal())
+ .getCode());
+ // User does not have permission
+ Assert.assertEquals(
+ TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(),
+ authorizerManager
+ .checkPermissionCache(
+ "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_USER.ordinal())
+ .getCode());
+
+ // Authenticate users with roles
+ authorizerManager.invalidateCache(user.getName(), "");
+ tUserResp.setPrivilegeList(new ArrayList<>());
+ tUserResp.setRoleList(user.getRoleList());
+
+ // Permission information for roles owned by users
+ for (Role role : roleList1) {
+ TRoleResp tRoleResp = new TRoleResp();
+ rolePrivilegeList = new ArrayList<>();
+ tRoleResp.setRoleName(role.getName());
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ rolePrivilegeList.add(pathPrivilege.getPath());
+ String privilegeIdList = pathPrivilege.getPrivileges().toString();
+ rolePrivilegeList.add(privilegeIdList.substring(1, privilegeIdList.length() - 1));
+ }
+ tRoleResp.setPrivilegeList(rolePrivilegeList);
+ tRoleRespMap.put(role.getName(), tRoleResp);
+ }
+ result.setRoleInfo(tRoleRespMap);
+ authorizerManager.getUserCache().put(user.getName(), authorizerManager.cacheUser(result));
+ Role role3 = authorizerManager.getRoleCache().getIfPresent(role1.getName());
+ Assert.assertEquals(role1.getName(), role3.getName());
+ Assert.assertEquals(role1.getPrivilegeList(), role3.getPrivilegeList());
+
+ // role has permission
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ authorizerManager
+ .checkPermissionCache(
+ "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_ROLE.ordinal())
+ .getCode());
+ // role does not have permission
+ Assert.assertEquals(
+ TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(),
+ authorizerManager
+ .checkPermissionCache(
+ "user", Collections.singletonList("root.ln"), PrivilegeType.CREATE_USER.ordinal())
+ .getCode());
+
+ authorizerManager.invalidateCache(user.getName(), "");
+
+ user1 = authorizerManager.getUserCache().getIfPresent(user.getName());
+ role1 = authorizerManager.getRoleCache().getIfPresent(role1.getName());
+
+ Assert.assertNull(user1);
+ Assert.assertNull(role1);
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 08146da4b4..6d72f1d7be 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -104,6 +104,7 @@ public enum TSStatusCode {
USER_NOT_EXIST_ERROR(605),
ROLE_NOT_EXIST_ERROR(606),
AUTHENTICATION_ERROR(607),
+ INVALIDATE_PERMISSION_CACHE_ERROR(608),
// cluster-related errors
PARTITION_NOT_READY(700),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 4b933e548f..f66e743b96 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -164,6 +164,24 @@ struct TAuthorizerResp {
2: required map<string, list<string>> authorizerInfo
}
+struct TUserResp{
+ 1: required string username
+ 2: required string password
+ 3: required list<string> privilegeList
+ 4: required list<string> roleList
+}
+
+struct TRoleResp{
+ 1: required string roleName
+ 2: required list<string> privilegeList
+}
+
+struct TPermissionInfoResp{
+ 1: required TUserResp userInfo
+ 2: required map<string, TRoleResp> roleInfo
+ 3: required common.TSStatus status
+}
+
struct TLoginReq {
1: required string userrname
2: required string password
@@ -243,9 +261,9 @@ service ConfigIService {
TAuthorizerResp queryPermission(TAuthorizerReq req)
- common.TSStatus login(TLoginReq req)
+ TPermissionInfoResp login(TLoginReq req)
- common.TSStatus checkUserPrivileges(TCheckUserPrivilegesReq req)
+ TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req)
/* ConfigNode */
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index dfc26e4da0..d55520a9f4 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -135,6 +135,11 @@ struct TSchemaFetchResponse {
1: required binary serializedSchemaTree
}
+struct TInvalidatePermissionCacheReq{
+ 1: required string username
+ 2: required string roleName
+}
+
service InternalService {
// -----------------------------------For Data Node-----------------------------------------------
@@ -210,6 +215,13 @@ service InternalService {
* @param ConfigNode will send the latest config_node_list and load balancing policies in THeartbeatReq
**/
common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
+
+ /**
+ * Config node will invalidate permission Info cache.
+ *
+ * @param string:username, list<string>:roleList
+ */
+ common.TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req)
}
service DataBlockService {