You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/03/17 12:36:28 UTC
[incubator-pegasus] branch master updated: feat(Ranger): Pull policies from the Ranger Service and update using resources policies (#1388)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 034cb9d10 feat(Ranger): Pull policies from the Ranger Service and update using resources policies (#1388)
034cb9d10 is described below
commit 034cb9d107e197c06f909de467258b3503cfb338
Author: WHBANG <38...@users.noreply.github.com>
AuthorDate: Fri Mar 17 20:36:21 2023 +0800
feat(Ranger): Pull policies from the Ranger Service and update using resources policies (#1388)
https://github.com/apache/incubator-pegasus/issues/1054
This patch implements how to pull policies from the Ranger Service
and dump policies to remote storage.
- Pull policies in JSON format from Ranger service and parse
policies from JSON formated string.
- Create the path to save policies in remote storage, and update
using resources policies.
- Dump policies to remote storage.
- Sync policies to app envs.
- Update the cached global/database resources policies.
---
src/common/replica_envs.h | 1 +
src/common/replication.codes.h | 1 +
src/common/replication_common.cpp | 2 +
src/meta/app_env_validator.cpp | 3 +-
.../ranger/ranger_resource_policy_manager.cpp | 409 ++++++++++++++++++++-
.../ranger/ranger_resource_policy_manager.h | 51 ++-
.../test/ranger_resource_policy_manager_test.cpp | 41 +++
src/utils/error_code.h | 4 +
8 files changed, 509 insertions(+), 3 deletions(-)
diff --git a/src/common/replica_envs.h b/src/common/replica_envs.h
index f4c0d5829..4db367a7f 100644
--- a/src/common/replica_envs.h
+++ b/src/common/replica_envs.h
@@ -56,6 +56,7 @@ public:
static const std::string MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION;
static const std::string BUSINESS_INFO;
static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS;
+ static const std::string REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES;
static const std::string READ_QPS_THROTTLING;
static const std::string READ_SIZE_THROTTLING;
static const std::string BACKUP_REQUEST_QPS_THROTTLING;
diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h
index 73df12497..72739c27e 100644
--- a/src/common/replication.codes.h
+++ b/src/common/replication.codes.h
@@ -131,6 +131,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
+MAKE_EVENT_CODE(LPC_USE_RANGER_ACCESS_CONTROL, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL
#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp
index 4ab27a74a..63617797a 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -384,6 +384,8 @@ const std::string replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED("replica.rocksdb_blo
const std::string replica_envs::BUSINESS_INFO("business.info");
const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS(
"replica_access_controller.allowed_users");
+const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES(
+ "replica_access_controller.ranger_policies");
const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling");
const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size");
const std::string
diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp
index f541ee271..41f9c299c 100644
--- a/src/meta/app_env_validator.cpp
+++ b/src/meta/app_env_validator.cpp
@@ -212,7 +212,8 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
- {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}};
+ {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
+ {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}};
}
} // namespace replication
diff --git a/src/runtime/ranger/ranger_resource_policy_manager.cpp b/src/runtime/ranger/ranger_resource_policy_manager.cpp
index c9025ca51..3c30e262e 100644
--- a/src/runtime/ranger/ranger_resource_policy_manager.cpp
+++ b/src/runtime/ranger/ranger_resource_policy_manager.cpp
@@ -17,19 +17,61 @@
#include <ctype.h>
#include <algorithm>
+#include <chrono>
+#include <iosfwd>
+#include <memory>
#include <unordered_set>
#include <utility>
+// Disable class-memaccess warning to facilitate compilation with gcc>7
+// https://github.com/Tencent/rapidjson/issues/1700
+#pragma GCC diagnostic push
+#if defined(__GNUC__) && __GNUC__ >= 8
+#pragma GCC diagnostic ignored "-Wclass-memaccess"
+#endif
+#include <rapidjson/document.h>
+
+#pragma GCC diagnostic pop
+
+#include "common/replica_envs.h"
+#include "common/replication.codes.h"
+#include "common/replication_common.h"
+#include "dsn.layer2_types.h"
+#include "fmt/core.h"
#include "meta/meta_options.h"
#include "meta/meta_service.h"
+#include "meta/meta_state_service.h"
+#include "meta/server_state.h"
+#include "meta_admin_types.h"
#include "ranger_resource_policy_manager.h"
+#include "rapidjson/allocators.h"
#include "runtime/ranger/ranger_resource_policy.h"
+#include "runtime/task/async_calls.h"
+#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
+#include "utils/blob.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/process_utils.h"
+#include "utils/smart_pointers.h"
+#include "utils/strings.h"
namespace dsn {
namespace ranger {
+DSN_DEFINE_uint32(security,
+ update_ranger_policy_interval_sec,
+ 5,
+ "The interval seconds of meta "
+ "server to pull the latest "
+ "access control policy from "
+ "Ranger service.");
+DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service url.");
+DSN_DEFINE_string(ranger,
+ ranger_service_name,
+ "",
+ "The name of the policies defined in the Ranger service.");
+
#define RETURN_ERR_IF_MISSING_MEMBER(obj, member) \
do { \
if (!obj.IsObject() || !obj.HasMember(member)) { \
@@ -83,9 +125,11 @@ const std::map<std::string, access_type> kAccessTypeMaping({{"READ", access_type
{"CONTROL", access_type::kControl}});
} // anonymous namespace
+const std::chrono::milliseconds kLoadRangerPolicyRetryDelayMs(10000);
+
ranger_resource_policy_manager::ranger_resource_policy_manager(
dsn::replication::meta_service *meta_svc)
- : _meta_svc(meta_svc) //, _local_policy_version(0)
+ : _meta_svc(meta_svc), _local_policy_version(-1)
{
_ranger_policy_meta_root = dsn::replication::meta_options::concat_path_unix_style(
_meta_svc->cluster_root(), "ranger_policy_meta_root");
@@ -171,5 +215,368 @@ void ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+ ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // For the newly created table, its app envs must be empty. This needs to be executed
+ // periodically to update the table's app envs, regardless of whether the Ranger policy is
+ // updated or not.
+ ERR_LOG_AND_RETURN_NOT_OK(sync_policies_to_app_envs(), "Sync policies to app envs failed.");
+ return dsn::ERR_OK;
+ }
+ ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+ start_to_dump_and_sync_policies();
+
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::pull_policies_from_ranger_service(
+ std::string *ranger_policies) const
+{
+ std::string cmd =
+ fmt::format("curl {}/{}", FLAGS_ranger_service_url, FLAGS_ranger_service_name);
+ std::stringstream resp;
+ if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+ return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+ }
+
+ *ranger_policies = resp.str();
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const std::string &data)
+{
+ // The Ranger policy pulled from Ranger service demo.
+ /*
+ {
+ "serviceName": "PEGASUS1",
+ "serviceId": 1069,
+ "policyVersion": 60,
+ "policyUpdateTime": 1673254471000,
+ "policies": [{
+ "id": 5334,
+ "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+ "isEnabled": true,
+ "version": 13,
+ "service": "PEGASUS1",
+ "name": "all - database",
+ "policyType": 0,
+ "policyPriority": 0,
+ "description": "Policy for all - database",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": ["PEGASUS1"],
+ "isExcludes": false,
+ "isRecursive": true
+ }
+ },
+ "policyItems": [{
+ "accesses": [{
+ "type": "create",
+ "isAllowed": true
+ }, {
+ "type": "drop",
+ "isAllowed": true
+ }, {
+ "type": "control",
+ "isAllowed": true
+ }, {
+ "type": "metadata",
+ "isAllowed": true
+ }, {
+ "type": "list",
+ "isAllowed": true
+ }],
+ "users": ["PEGASUS1"],
+ "groups": [],
+ "roles": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "serviceType": "pegasus",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [],
+ "zoneName": "",
+ "isDenyAllElse": false
+ }],
+ "auditMode": "audit-default",
+ "serviceConfig": {}
+ }
+ */
+ rapidjson::Document doc;
+ doc.Parse(data.c_str());
+
+ // Check if it is needed to update policies.
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+ int remote_policy_version = doc["policyVersion"].GetInt();
+ if (_local_policy_version == remote_policy_version) {
+ LOG_DEBUG("Ranger policy version: {}, no need to update.", _local_policy_version);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version > remote_policy_version) {
+ LOG_WARNING("Local Ranger policy version ({}) is larger than remote version ({}), please "
+ "check Ranger services ({}).",
+ _local_policy_version,
+ remote_policy_version,
+ FLAGS_ranger_service_name);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ _local_policy_version = remote_policy_version;
+
+ // Update policies.
+ _all_resource_policies.clear();
+
+ // TODO(wanghao): it's optional
+ // Provide a DATABASE default policy for legacy tables.
+ // ranger_resource_policy default_database_policy;
+ // ranger_resource_policy::create_default_database_policy(default_database_policy);
+ // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = {default_database_policy};
+
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+ const rapidjson::Value &policies = doc["policies"];
+ RETURN_ERR_IF_NOT_ARRAY(policies);
+ for (const auto &policy : policies.GetArray()) {
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+ // 1. Check if the policy is enabled or not.
+ if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+ continue;
+ }
+
+ // 2. Parse resource type.
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+ std::map<std::string, std::unordered_set<std::string>> values_of_resource_type;
+ for (const auto &resource : policy["resources"].GetObject()) {
+ RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+ RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+ std::unordered_set<std::string> values;
+ for (const auto &v : (resource.value)["values"].GetArray()) {
+ values.insert(v.GetString());
+ }
+ values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), values));
+ }
+
+ // 3. Construct ACL policy.
+ ranger_resource_policy resource_policy;
+ CONTINUE_IF_MISSING_MEMBER(policy, "name");
+ resource_policy.name = policy["name"].GetString();
+
+ resource_type rt = resource_type::kUnknown;
+ do {
+ // TODO(wanghao): refactor the following code
+ // parse Ranger policies json string into `values_of_resource_type`, distinguish
+ // resource types by `values_of_resource_type.size()`
+ if (values_of_resource_type.size() == 1) {
+ auto iter = values_of_resource_type.find("global");
+ if (iter != values_of_resource_type.end()) {
+ rt = resource_type::kGlobal;
+ break;
+ }
+ iter = values_of_resource_type.find("database");
+ if (iter != values_of_resource_type.end()) {
+ resource_policy.database_names = iter->second;
+ rt = resource_type::kDatabase;
+ break;
+ }
+ } else if (values_of_resource_type.size() == 2) {
+ auto iter1 = values_of_resource_type.find("database");
+ auto iter2 = values_of_resource_type.find("table");
+ if (iter1 != values_of_resource_type.end() &&
+ iter2 != values_of_resource_type.end()) {
+ resource_policy.database_names = iter1->second;
+ resource_policy.table_names = iter2->second;
+ rt = resource_type::kDatabaseTable;
+ break;
+ }
+ }
+ return dsn::ERR_RANGER_PARSE_ACL;
+ } while (false);
+
+ parse_policies_from_json(policy["policyItems"], resource_policy.policies.allow_policies);
+ parse_policies_from_json(policy["denyPolicyItems"], resource_policy.policies.deny_policies);
+ parse_policies_from_json(policy["allowExceptions"],
+ resource_policy.policies.allow_policies_exclude);
+ parse_policies_from_json(policy["denyExceptions"],
+ resource_policy.policies.deny_policies_exclude);
+
+ // 4. Add the ACL policy.
+ auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+ resource_policies({resource_policy}));
+ if (!ret.second) {
+ ret.first->second.emplace_back(resource_policy);
+ }
+ }
+
+ return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+ dsn::task_ptr sync_task = dsn::tasking::create_task(
+ LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { dump_and_sync_policies(); });
+ _meta_svc->get_remote_storage()->create_node(
+ _ranger_policy_meta_root,
+ LPC_USE_RANGER_ACCESS_CONTROL,
+ [this, sync_task](dsn::error_code err) {
+ if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+ LOG_DEBUG("Create Ranger policy meta root succeed.");
+ sync_task->enqueue();
+ return;
+ }
+ CHECK_EQ(err, dsn::ERR_TIMEOUT);
+ LOG_ERROR("Create Ranger policy meta root timeout, retry later.");
+ dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+ &_tracker,
+ [this]() { start_to_dump_and_sync_policies(); },
+ 0,
+ kLoadRangerPolicyRetryDelayMs);
+ });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+ dump_policies_to_remote_storage();
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+ update_cached_policies();
+ LOG_DEBUG("Update using resources policies succeed.");
+
+ if (dsn::ERR_OK != sync_policies_to_app_envs()) {
+ LOG_ERROR("Sync policies to app envs failed.");
+ }
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+ dsn::blob value = json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+ _meta_svc->get_remote_storage()->set_data(
+ _ranger_policy_meta_root, value, LPC_USE_RANGER_ACCESS_CONTROL, [this](dsn::error_code e) {
+ if (e == dsn::ERR_OK) {
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+ return;
+ }
+ // The return error code is not 'ERR_TIMEOUT', use assert here.
+ CHECK_EQ(e, dsn::ERR_TIMEOUT);
+ LOG_ERROR("Dump Ranger policies to remote storage timeout, retry later.");
+ dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+ &_tracker,
+ [this]() { dump_policies_to_remote_storage(); },
+ 0,
+ kLoadRangerPolicyRetryDelayMs);
+ });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+ {
+ utils::auto_write_lock l(_global_policies_lock);
+ _global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+ // TODO(wanghao): provide a query method
+ }
+ {
+ utils::auto_write_lock l(_database_policies_lock);
+ _database_policies_cache.swap(
+ _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+ // TODO(wanghao): provide a query method
+ }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+ const auto &table_policies =
+ _all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+ if (table_policies == _all_resource_policies.end()) {
+ LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app envs.");
+ return dsn::ERR_OK;
+ }
+
+ dsn::replication::configuration_list_apps_response list_resp;
+ dsn::replication::configuration_list_apps_request list_req;
+ list_req.status = dsn::app_status::AS_AVAILABLE;
+ _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+ ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+ for (const auto &app : list_resp.infos) {
+ std::string database_name = get_database_name_from_app_name(app.app_name);
+ // Use "*" for table name of invalid Ranger rules to match datdabase resources.
+ if (database_name.empty()) {
+ database_name = "*";
+ }
+ std::string table_name = get_table_name_from_app_name(app.app_name);
+
+ auto req = dsn::make_unique<dsn::replication::configuration_update_app_env_request>();
+ req->__set_app_name(app.app_name);
+ req->__set_keys(
+ {dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
+ bool is_policy_matched = false;
+ for (const auto &policy : table_policies->second) {
+ if (policy.database_names.count(database_name) == 0) {
+ continue;
+ }
+
+ // if table name does not conform to the naming rules(database_name.table_name),
+ // database is defined by "*" in ranger for acl matching
+ if (policy.table_names.count("*") != 0 || policy.table_names.count(table_name) != 0) {
+ is_policy_matched = true;
+ req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_SET);
+ req->__set_values(
+ {json::json_forwarder<acl_policies>::encode(policy.policies).to_string()});
+
+ dsn::replication::update_app_env_rpc rpc(std::move(req),
+ LPC_USE_RANGER_ACCESS_CONTROL);
+ _meta_svc->get_server_state()->set_app_envs(rpc);
+ ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "set_app_envs failed.");
+ break;
+ }
+ }
+
+ // There is no matched policy, clear app Ranger policy
+ if (!is_policy_matched) {
+ req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL);
+
+ dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_USE_RANGER_ACCESS_CONTROL);
+ _meta_svc->get_server_state()->del_app_envs(rpc);
+ ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "del_app_envs failed.");
+ }
+ }
+
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+ return dsn::ERR_OK;
+}
+
+std::string get_database_name_from_app_name(const std::string &app_name)
+{
+ std::string prefix = utils::find_string_prefix(app_name, '.');
+ if (prefix.empty() || prefix == app_name) {
+ return std::string();
+ }
+
+ return prefix;
+}
+
+std::string get_table_name_from_app_name(const std::string &app_name)
+{
+ std::string database_name = get_database_name_from_app_name(app_name);
+ return database_name.empty() ? app_name : app_name.substr(database_name.size() + 1);
+}
} // namespace ranger
} // namespace dsn
diff --git a/src/runtime/ranger/ranger_resource_policy_manager.h b/src/runtime/ranger/ranger_resource_policy_manager.h
index 5b584a275..5c920d912 100644
--- a/src/runtime/ranger/ranger_resource_policy_manager.h
+++ b/src/runtime/ranger/ranger_resource_policy_manager.h
@@ -28,7 +28,10 @@
#include "ranger_resource_policy.h"
#include "rapidjson/document.h"
#include "runtime/ranger/access_type.h"
+#include "runtime/task/task_tracker.h"
#include "utils/enum_helper.h"
+#include "utils/error_code.h"
+#include "utils/synchronize.h"
namespace dsn {
@@ -73,12 +76,48 @@ private:
static void parse_policies_from_json(const rapidjson::Value &data,
std::vector<policy_item> &policies);
+ // Update policies from Ranger service.
+ dsn::error_code update_policies_from_ranger_service();
+
+ // Pull policies in JSON format from Ranger service.
+ dsn::error_code pull_policies_from_ranger_service(std::string *ranger_policies) const;
+
+ // Load policies from JSON formated string.
+ dsn::error_code load_policies_from_json(const std::string &data);
+
+ // Create the path to save policies in remote storage, and update using resources policies.
+ void start_to_dump_and_sync_policies();
+
+ // Sync policies in use from Ranger service.
+ void dump_and_sync_policies();
+
+ // Dump policies to remote storage.
+ void dump_policies_to_remote_storage();
+
+ // Update the cached global/database resources policies.
+ void update_cached_policies();
+
+ // Sync policies to app_envs(REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES).
+ dsn::error_code sync_policies_to_app_envs();
+
private:
+ dsn::task_tracker _tracker;
+
// The path where policies to be saved in remote storage.
std::string _ranger_policy_meta_root;
replication::meta_service *_meta_svc;
+ // The cache of the global resources policies, it's a subset of '_all_resource_policies'.
+ utils::rw_lock_nr _global_policies_lock; // [
+ resource_policies _global_policies_cache;
+ // ]
+
+ // The cache of the database resources policies, it's a subset of '_all_resource_policies'.
+ utils::rw_lock_nr _database_policies_lock; // [
+ resource_policies _database_policies_cache;
+ // ]
+
// The access type of RPCs which access global level resources.
access_type_of_rpc_code _ac_type_of_global_rpcs;
@@ -86,7 +125,7 @@ private:
access_type_of_rpc_code _ac_type_of_database_rpcs;
// The Ranger policy version to determine whether to update.
- // int _local_policy_version;
+ int _local_policy_version;
// All Ranger ACL policies.
all_resource_policies _all_resource_policies;
@@ -95,5 +134,15 @@ private:
FRIEND_TEST(ranger_resource_policy_manager_test, parse_policies_from_json_for_test);
};
+
+// Try to get the database name of 'app_name'.
+// When using Ranger for ACL, the constraint table naming rule is
+// "{database_name}.{table_name}", use "." to split database name and table name.
+// Return an empty string if 'app_name' is not a valid Ranger rule table name.
+std::string get_database_name_from_app_name(const std::string &app_name);
+
+// Try to get the table_name of 'app_name'.
+// Return 'app_name' if 'app_name' is not a valid Ranger rule table name.
+std::string get_table_name_from_app_name(const std::string &app_name);
} // namespace ranger
} // namespace dsn
diff --git a/src/runtime/test/ranger_resource_policy_manager_test.cpp b/src/runtime/test/ranger_resource_policy_manager_test.cpp
index b8ca83cad..475808e86 100644
--- a/src/runtime/test/ranger_resource_policy_manager_test.cpp
+++ b/src/runtime/test/ranger_resource_policy_manager_test.cpp
@@ -215,5 +215,46 @@ TEST(ranger_resource_policy_manager_test, ranger_resource_policy_serialized_test
EXPECT_EQ(test.expected_result, actual_result);
}
}
+
+TEST(ranger_resource_policy_manager_test, get_database_name_from_app_name_test)
+{
+ struct test_case
+ {
+ std::string app_name;
+ std::string expected_result;
+ } tests[] = {{"", ""},
+ {".", ""},
+ {"...", ""},
+ {"database_name.", "database_name"},
+ {".table_name", ""},
+ {"app_name", ""},
+ {"database_name.table_name", "database_name"},
+ {"a.b.c", "a"}};
+ for (const auto &test : tests) {
+ auto actual_result = get_database_name_from_app_name(test.app_name);
+ EXPECT_EQ(test.expected_result, actual_result);
+ }
+}
+
+TEST(ranger_resource_policy_manager_test, get_table_name_from_app_name_test)
+{
+ struct test_case
+ {
+ std::string app_name;
+ std::string expected_result;
+ } tests[] = {{"", ""},
+ {".", "."},
+ {"...", "..."},
+ {"database_name.", ""},
+ {".table_name", ".table_name"},
+ {"app_name", "app_name"},
+ {"database_name.table_name", "table_name"},
+ {"a.b.c", "b.c"}};
+ for (const auto &test : tests) {
+ auto actual_result = get_table_name_from_app_name(test.app_name);
+ EXPECT_EQ(test.expected_result, actual_result);
+ }
+}
+
} // namespace ranger
} // namespace dsn
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index c9fb34437..613a67efe 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -168,4 +168,8 @@ DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
DEFINE_ERR_CODE(ERR_CHILD_NOT_READY)
DEFINE_ERR_CODE(ERR_DISK_INSUFFICIENT)
DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED)
+
+DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED)
+DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
+DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org