You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/03/17 12:36:28 UTC

[incubator-pegasus] branch master updated: feat(Ranger): Pull policies from the Ranger Service and update using resources policies (#1388)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 034cb9d10 feat(Ranger): Pull policies from the Ranger Service and update using resources policies (#1388)
034cb9d10 is described below

commit 034cb9d107e197c06f909de467258b3503cfb338
Author: WHBANG <38...@users.noreply.github.com>
AuthorDate: Fri Mar 17 20:36:21 2023 +0800

    feat(Ranger): Pull policies from the Ranger Service and update using resources policies (#1388)
    
    https://github.com/apache/incubator-pegasus/issues/1054
    
    This patch implements how to pull policies from the Ranger Service
    and dump policies to remote storage.
    
    - Pull policies in JSON format from Ranger service and parse
      policies from JSON formated string.
    - Create the path to save policies in remote storage, and update
      using resources policies.
    - Dump policies to remote storage.
    - Sync policies to app envs.
    - Update the cached global/database resources policies.
---
 src/common/replica_envs.h                          |   1 +
 src/common/replication.codes.h                     |   1 +
 src/common/replication_common.cpp                  |   2 +
 src/meta/app_env_validator.cpp                     |   3 +-
 .../ranger/ranger_resource_policy_manager.cpp      | 409 ++++++++++++++++++++-
 .../ranger/ranger_resource_policy_manager.h        |  51 ++-
 .../test/ranger_resource_policy_manager_test.cpp   |  41 +++
 src/utils/error_code.h                             |   4 +
 8 files changed, 509 insertions(+), 3 deletions(-)

diff --git a/src/common/replica_envs.h b/src/common/replica_envs.h
index f4c0d5829..4db367a7f 100644
--- a/src/common/replica_envs.h
+++ b/src/common/replica_envs.h
@@ -56,6 +56,7 @@ public:
     static const std::string MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION;
     static const std::string BUSINESS_INFO;
     static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS;
+    static const std::string REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES;
     static const std::string READ_QPS_THROTTLING;
     static const std::string READ_SIZE_THROTTLING;
     static const std::string BACKUP_REQUEST_QPS_THROTTLING;
diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h
index 73df12497..72739c27e 100644
--- a/src/common/replication.codes.h
+++ b/src/common/replication.codes.h
@@ -131,6 +131,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
+MAKE_EVENT_CODE(LPC_USE_RANGER_ACCESS_CONTROL, TASK_PRIORITY_COMMON)
 #undef CURRENT_THREAD_POOL
 
 #define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp
index 4ab27a74a..63617797a 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -384,6 +384,8 @@ const std::string replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED("replica.rocksdb_blo
 const std::string replica_envs::BUSINESS_INFO("business.info");
 const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS(
     "replica_access_controller.allowed_users");
+const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES(
+    "replica_access_controller.ranger_policies");
 const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling");
 const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size");
 const std::string
diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp
index f541ee271..41f9c299c 100644
--- a/src/meta/app_env_validator.cpp
+++ b/src/meta/app_env_validator.cpp
@@ -212,7 +212,8 @@ void app_env_validator::register_all_validators()
         {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr},
         {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
         {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
-        {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}};
+        {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
+        {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}};
 }
 
 } // namespace replication
diff --git a/src/runtime/ranger/ranger_resource_policy_manager.cpp b/src/runtime/ranger/ranger_resource_policy_manager.cpp
index c9025ca51..3c30e262e 100644
--- a/src/runtime/ranger/ranger_resource_policy_manager.cpp
+++ b/src/runtime/ranger/ranger_resource_policy_manager.cpp
@@ -17,19 +17,61 @@
 
 #include <ctype.h>
 #include <algorithm>
+#include <chrono>
+#include <iosfwd>
+#include <memory>
 #include <unordered_set>
 #include <utility>
 
+// Disable class-memaccess warning to facilitate compilation with gcc>7
+// https://github.com/Tencent/rapidjson/issues/1700
+#pragma GCC diagnostic push
+#if defined(__GNUC__) && __GNUC__ >= 8
+#pragma GCC diagnostic ignored "-Wclass-memaccess"
+#endif
+#include <rapidjson/document.h>
+
+#pragma GCC diagnostic pop
+
+#include "common/replica_envs.h"
+#include "common/replication.codes.h"
+#include "common/replication_common.h"
+#include "dsn.layer2_types.h"
+#include "fmt/core.h"
 #include "meta/meta_options.h"
 #include "meta/meta_service.h"
+#include "meta/meta_state_service.h"
+#include "meta/server_state.h"
+#include "meta_admin_types.h"
 #include "ranger_resource_policy_manager.h"
+#include "rapidjson/allocators.h"
 #include "runtime/ranger/ranger_resource_policy.h"
+#include "runtime/task/async_calls.h"
+#include "runtime/task/task.h"
 #include "runtime/task/task_code.h"
+#include "utils/blob.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
+#include "utils/process_utils.h"
+#include "utils/smart_pointers.h"
+#include "utils/strings.h"
 
 namespace dsn {
 namespace ranger {
 
+DSN_DEFINE_uint32(security,
+                  update_ranger_policy_interval_sec,
+                  5,
+                  "The interval seconds of meta "
+                  "server to pull the latest "
+                  "access control policy from "
+                  "Ranger service.");
+DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service url.");
+DSN_DEFINE_string(ranger,
+                  ranger_service_name,
+                  "",
+                  "The name of the policies defined in the Ranger service.");
+
 #define RETURN_ERR_IF_MISSING_MEMBER(obj, member)                                                  \
     do {                                                                                           \
         if (!obj.IsObject() || !obj.HasMember(member)) {                                           \
@@ -83,9 +125,11 @@ const std::map<std::string, access_type> kAccessTypeMaping({{"READ", access_type
                                                             {"CONTROL", access_type::kControl}});
 } // anonymous namespace
 
+const std::chrono::milliseconds kLoadRangerPolicyRetryDelayMs(10000);
+
 ranger_resource_policy_manager::ranger_resource_policy_manager(
     dsn::replication::meta_service *meta_svc)
-    : _meta_svc(meta_svc) //, _local_policy_version(0)
+    : _meta_svc(meta_svc), _local_policy_version(-1)
 {
     _ranger_policy_meta_root = dsn::replication::meta_options::concat_path_unix_style(
         _meta_svc->cluster_root(), "ranger_policy_meta_root");
@@ -171,5 +215,368 @@ void ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // For the newly created table, its app envs must be empty. This needs to be executed
+        // periodically to update the table's app envs, regardless of whether the Ranger policy is
+        // updated or not.
+        ERR_LOG_AND_RETURN_NOT_OK(sync_policies_to_app_envs(), "Sync policies to app envs failed.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", _local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    _local_policy_version = remote_policy_version;
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = {default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into `values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root,
+        LPC_USE_RANGER_ACCESS_CONTROL,
+        [this, sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, retry later.");
+            dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+                                  &_tracker,
+                                  [this]() { start_to_dump_and_sync_policies(); },
+                                  0,
+                                  kLoadRangerPolicyRetryDelayMs);
+        });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+    dump_policies_to_remote_storage();
+    LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+    update_cached_policies();
+    LOG_DEBUG("Update using resources policies succeed.");
+
+    if (dsn::ERR_OK != sync_policies_to_app_envs()) {
+        LOG_ERROR("Sync policies to app envs failed.");
+    }
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+    dsn::blob value = json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+    _meta_svc->get_remote_storage()->set_data(
+        _ranger_policy_meta_root, value, LPC_USE_RANGER_ACCESS_CONTROL, [this](dsn::error_code e) {
+            if (e == dsn::ERR_OK) {
+                LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+                return;
+            }
+            // The return error code is not 'ERR_TIMEOUT', use assert here.
+            CHECK_EQ(e, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Dump Ranger policies to remote storage timeout, retry later.");
+            dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+                                  &_tracker,
+                                  [this]() { dump_policies_to_remote_storage(); },
+                                  0,
+                                  kLoadRangerPolicyRetryDelayMs);
+        });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+    {
+        utils::auto_write_lock l(_global_policies_lock);
+        _global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+        // TODO(wanghao): provide a query method
+    }
+    {
+        utils::auto_write_lock l(_database_policies_lock);
+        _database_policies_cache.swap(
+            _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+        // TODO(wanghao): provide a query method
+    }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+    const auto &table_policies =
+        _all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+    if (table_policies == _all_resource_policies.end()) {
+        LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app envs.");
+        return dsn::ERR_OK;
+    }
+
+    dsn::replication::configuration_list_apps_response list_resp;
+    dsn::replication::configuration_list_apps_request list_req;
+    list_req.status = dsn::app_status::AS_AVAILABLE;
+    _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+    ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+    for (const auto &app : list_resp.infos) {
+        std::string database_name = get_database_name_from_app_name(app.app_name);
+        // Use "*" for table name of invalid Ranger rules to match datdabase resources.
+        if (database_name.empty()) {
+            database_name = "*";
+        }
+        std::string table_name = get_table_name_from_app_name(app.app_name);
+
+        auto req = dsn::make_unique<dsn::replication::configuration_update_app_env_request>();
+        req->__set_app_name(app.app_name);
+        req->__set_keys(
+            {dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
+        bool is_policy_matched = false;
+        for (const auto &policy : table_policies->second) {
+            if (policy.database_names.count(database_name) == 0) {
+                continue;
+            }
+
+            // if table name does not conform to the naming rules(database_name.table_name),
+            // database is defined by "*" in ranger for acl matching
+            if (policy.table_names.count("*") != 0 || policy.table_names.count(table_name) != 0) {
+                is_policy_matched = true;
+                req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_SET);
+                req->__set_values(
+                    {json::json_forwarder<acl_policies>::encode(policy.policies).to_string()});
+
+                dsn::replication::update_app_env_rpc rpc(std::move(req),
+                                                         LPC_USE_RANGER_ACCESS_CONTROL);
+                _meta_svc->get_server_state()->set_app_envs(rpc);
+                ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "set_app_envs failed.");
+                break;
+            }
+        }
+
+        // There is no matched policy, clear app Ranger policy
+        if (!is_policy_matched) {
+            req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL);
+
+            dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_USE_RANGER_ACCESS_CONTROL);
+            _meta_svc->get_server_state()->del_app_envs(rpc);
+            ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "del_app_envs failed.");
+        }
+    }
+
+    LOG_DEBUG("Sync policies to app envs succeeded.");
+    return dsn::ERR_OK;
+}
+
+std::string get_database_name_from_app_name(const std::string &app_name)
+{
+    std::string prefix = utils::find_string_prefix(app_name, '.');
+    if (prefix.empty() || prefix == app_name) {
+        return std::string();
+    }
+
+    return prefix;
+}
+
+std::string get_table_name_from_app_name(const std::string &app_name)
+{
+    std::string database_name = get_database_name_from_app_name(app_name);
+    return database_name.empty() ? app_name : app_name.substr(database_name.size() + 1);
+}
 } // namespace ranger
 } // namespace dsn
diff --git a/src/runtime/ranger/ranger_resource_policy_manager.h b/src/runtime/ranger/ranger_resource_policy_manager.h
index 5b584a275..5c920d912 100644
--- a/src/runtime/ranger/ranger_resource_policy_manager.h
+++ b/src/runtime/ranger/ranger_resource_policy_manager.h
@@ -28,7 +28,10 @@
 #include "ranger_resource_policy.h"
 #include "rapidjson/document.h"
 #include "runtime/ranger/access_type.h"
+#include "runtime/task/task_tracker.h"
 #include "utils/enum_helper.h"
+#include "utils/error_code.h"
+#include "utils/synchronize.h"
 
 namespace dsn {
 
@@ -73,12 +76,48 @@ private:
     static void parse_policies_from_json(const rapidjson::Value &data,
                                          std::vector<policy_item> &policies);
 
+    // Update policies from Ranger service.
+    dsn::error_code update_policies_from_ranger_service();
+
+    // Pull policies in JSON format from Ranger service.
+    dsn::error_code pull_policies_from_ranger_service(std::string *ranger_policies) const;
+
+    // Load policies from JSON formated string.
+    dsn::error_code load_policies_from_json(const std::string &data);
+
+    // Create the path to save policies in remote storage, and update using resources policies.
+    void start_to_dump_and_sync_policies();
+
+    // Sync policies in use from Ranger service.
+    void dump_and_sync_policies();
+
+    // Dump policies to remote storage.
+    void dump_policies_to_remote_storage();
+
+    // Update the cached global/database resources policies.
+    void update_cached_policies();
+
+    // Sync policies to app_envs(REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES).
+    dsn::error_code sync_policies_to_app_envs();
+
 private:
+    dsn::task_tracker _tracker;
+
     // The path where policies to be saved in remote storage.
     std::string _ranger_policy_meta_root;
 
     replication::meta_service *_meta_svc;
 
+    // The cache of the global resources policies, it's a subset of '_all_resource_policies'.
+    utils::rw_lock_nr _global_policies_lock; // [
+    resource_policies _global_policies_cache;
+    // ]
+
+    // The cache of the database resources policies, it's a subset of '_all_resource_policies'.
+    utils::rw_lock_nr _database_policies_lock; // [
+    resource_policies _database_policies_cache;
+    // ]
+
     // The access type of RPCs which access global level resources.
     access_type_of_rpc_code _ac_type_of_global_rpcs;
 
@@ -86,7 +125,7 @@ private:
     access_type_of_rpc_code _ac_type_of_database_rpcs;
 
     // The Ranger policy version to determine whether to update.
-    //    int _local_policy_version;
+    int _local_policy_version;
 
     // All Ranger ACL policies.
     all_resource_policies _all_resource_policies;
@@ -95,5 +134,15 @@ private:
 
     FRIEND_TEST(ranger_resource_policy_manager_test, parse_policies_from_json_for_test);
 };
+
+// Try to get the database name of 'app_name'.
+// When using Ranger for ACL, the constraint table naming rule is
+// "{database_name}.{table_name}", use "." to split database name and table name.
+// Return an empty string if 'app_name' is not a valid Ranger rule table name.
+std::string get_database_name_from_app_name(const std::string &app_name);
+
+// Try to get the table_name of 'app_name'.
+// Return 'app_name' if 'app_name' is not a valid Ranger rule table name.
+std::string get_table_name_from_app_name(const std::string &app_name);
 } // namespace ranger
 } // namespace dsn
diff --git a/src/runtime/test/ranger_resource_policy_manager_test.cpp b/src/runtime/test/ranger_resource_policy_manager_test.cpp
index b8ca83cad..475808e86 100644
--- a/src/runtime/test/ranger_resource_policy_manager_test.cpp
+++ b/src/runtime/test/ranger_resource_policy_manager_test.cpp
@@ -215,5 +215,46 @@ TEST(ranger_resource_policy_manager_test, ranger_resource_policy_serialized_test
         EXPECT_EQ(test.expected_result, actual_result);
     }
 }
+
+TEST(ranger_resource_policy_manager_test, get_database_name_from_app_name_test)
+{
+    struct test_case
+    {
+        std::string app_name;
+        std::string expected_result;
+    } tests[] = {{"", ""},
+                 {".", ""},
+                 {"...", ""},
+                 {"database_name.", "database_name"},
+                 {".table_name", ""},
+                 {"app_name", ""},
+                 {"database_name.table_name", "database_name"},
+                 {"a.b.c", "a"}};
+    for (const auto &test : tests) {
+        auto actual_result = get_database_name_from_app_name(test.app_name);
+        EXPECT_EQ(test.expected_result, actual_result);
+    }
+}
+
+TEST(ranger_resource_policy_manager_test, get_table_name_from_app_name_test)
+{
+    struct test_case
+    {
+        std::string app_name;
+        std::string expected_result;
+    } tests[] = {{"", ""},
+                 {".", "."},
+                 {"...", "..."},
+                 {"database_name.", ""},
+                 {".table_name", ".table_name"},
+                 {"app_name", "app_name"},
+                 {"database_name.table_name", "table_name"},
+                 {"a.b.c", "b.c"}};
+    for (const auto &test : tests) {
+        auto actual_result = get_table_name_from_app_name(test.app_name);
+        EXPECT_EQ(test.expected_result, actual_result);
+    }
+}
+
 } // namespace ranger
 } // namespace dsn
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index c9fb34437..613a67efe 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -168,4 +168,8 @@ DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
 DEFINE_ERR_CODE(ERR_CHILD_NOT_READY)
 DEFINE_ERR_CODE(ERR_DISK_INSUFFICIENT)
 DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED)
+
+DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED)
+DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
+DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org